diff --git a/README.md b/README.md index a94459f..39f868e 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ consumer = Consumer(client_id='fill me in', A key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the ‘earliest’ setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message. -Example code: +Example code: ```python3 from gcn_kafka import Consumer diff --git a/gcn_kafka/__init__.py b/gcn_kafka/__init__.py index 6efbe68..78579fb 100644 --- a/gcn_kafka/__init__.py +++ b/gcn_kafka/__init__.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: CC0-1.0 -from .core import Consumer, Producer, AdminClient +from ._version import version as __version__ # noqa: F401 +from .core import AdminClient, Consumer, Producer from .env import config_from_env -from ._version import version as __version__ __all__ = ("config_from_env", "Consumer", "Producer", "AdminClient") diff --git a/gcn_kafka/core.py b/gcn_kafka/core.py index d5d8e95..c350088 100644 --- a/gcn_kafka/core.py +++ b/gcn_kafka/core.py @@ -37,14 +37,14 @@ def get_config(mode, config, **kwargs): if client_secret: config.setdefault("sasl.oauthbearer.client.secret", client_secret) config.setdefault( - "sasl.oauthbearer.token.endpoint.url", - f"https://auth.{domain}/oauth2/token") + "sasl.oauthbearer.token.endpoint.url", f"https://auth.{domain}/oauth2/token" + ) if mode == "consumer" and not config.get("group.id"): config["group.id"] = str(uuid4()) if mode == "producer": - config.setdefault('compression.type', 'zstd') + config.setdefault("compression.type", "zstd") set_oauth_cb(config) return config diff --git a/gcn_kafka/env.py b/gcn_kafka/env.py index 7e0d7a8..f4a97fa 100644 --- a/gcn_kafka/env.py +++ b/gcn_kafka/env.py @@ -4,15 +4,15 @@ import re from typing import Mapping, Optional -env_key_splitter = re.compile(r'_+') -replacement_dict = {'_': '.', '__': '-', '___': '_'} +env_key_splitter = re.compile(r"_+") +replacement_dict = {"_": ".", "__": "-", "___": "_"} # Adapted from https://peps.python.org/pep-0616/ # # FIXME: Remove after dropping support for Python 3.8 def removeprefix(self: str, prefix: str) -> str: if self.startswith(prefix): - return self[len(prefix):] + return self[len(prefix) :] else: return self[:] @@ -22,7 +22,9 @@ def replacement(match: re.Match) -> str: return replacement_dict.get(text) or text -def config_from_env(env: Optional[Mapping[str, str]] = None, prefix: str = 'KAFKA_') -> Mapping[str, str]: +def config_from_env( + env: Optional[Mapping[str, str]] = None, prefix: str = "KAFKA_" +) -> Mapping[str, str]: """Construct a Kafka client configuration dictionary from env variables. This uses the same rules as https://docs.confluent.io/platform/current/installation/docker/config-reference.html diff --git a/gcn_kafka/test/test_core.py b/gcn_kafka/test/test_core.py index 5bf1088..9fa8d05 100644 --- a/gcn_kafka/test/test_core.py +++ b/gcn_kafka/test/test_core.py @@ -3,32 +3,28 @@ def test_update_config_no_overwrite(): config = { - 'client_id':'qwertyuiopasdfghjklzxcvbnm', - 'client_secret':'qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio', - 'domain':'test.test.test' + "client_id": "qwertyuiopasdfghjklzxcvbnm", + "client_secret": "qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio", + "domain": "test.test.test", } - newConfig = update_config( - config, - client_id=None, - client_secret=None - ) + newConfig = update_config(config, client_id=None, client_secret=None) assert newConfig == config def test_update_config_with_overwrite(): config = { - 'client_id':'qwertyuiopasdfghjklzxcvbnm', - 'client_secret':'qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio', - 'domain':'test.test.test' + "client_id": "qwertyuiopasdfghjklzxcvbnm", + "client_secret": "qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio", + "domain": "test.test.test", } newConfig = update_config( config, client_id="client_id update Success", - client_secret="client_secret update Success" + client_secret="client_secret update Success", ) - assert newConfig['client_id'] == "client_id update Success" - assert newConfig['client_secret'] == "client_secret update Success" + assert newConfig["client_id"] == "client_id update Success" + assert newConfig["client_secret"] == "client_secret update Success" diff --git a/gcn_kafka/test/test_env.py b/gcn_kafka/test/test_env.py index b2bc09f..f15671e 100644 --- a/gcn_kafka/test/test_env.py +++ b/gcn_kafka/test/test_env.py @@ -6,13 +6,12 @@ def test_config_from_env(monkeypatch): - env = {'FOO_BAR_BAT__BAZ___': '123', - 'XYZZ_BAR_BAT__BAZ___': '456'} + env = {"FOO_BAR_BAT__BAZ___": "123", "XYZZ_BAR_BAT__BAZ___": "456"} - config = config_from_env(env, 'FOO_') - assert config == {'bar.bat-baz_': '123'} + config = config_from_env(env, "FOO_") + assert config == {"bar.bat-baz_": "123"} - monkeypatch.setattr(os, 'environ', env) + monkeypatch.setattr(os, "environ", env) - config = config_from_env(prefix='FOO_') - assert config == {'bar.bat-baz_': '123'} + config = config_from_env(prefix="FOO_") + assert config == {"bar.bat-baz_": "123"} diff --git a/gcn_kafka/test/test_oidc.py b/gcn_kafka/test/test_oidc.py index ff7aa44..c434847 100644 --- a/gcn_kafka/test/test_oidc.py +++ b/gcn_kafka/test/test_oidc.py @@ -11,21 +11,23 @@ def test_no_oidc(): def test_oidc(monkeypatch): mock_session_class = MagicMock() - monkeypatch.setattr(oidc, 'OAuth2Session', mock_session_class) + monkeypatch.setattr(oidc, "OAuth2Session", mock_session_class) config = { - 'sasl.oauthbearer.method': 'oidc', - 'sasl.oauthbearer.client.id': 'client_id', - 'sasl.oauthbearer.client.secret': 'client_secret', - 'sasl.oauthbearer.scope': 'scope', - 'sasl.oauthbearer.token.endpoint.url': 'token_endpoint' + "sasl.oauthbearer.method": "oidc", + "sasl.oauthbearer.client.id": "client_id", + "sasl.oauthbearer.client.secret": "client_secret", + "sasl.oauthbearer.scope": "scope", + "sasl.oauthbearer.token.endpoint.url": "token_endpoint", } oidc.set_oauth_cb(config) - oauth_cb = config.pop('oauth_cb') + oauth_cb = config.pop("oauth_cb") assert config == {} mock_session_class.assert_called_once_with( - 'client_id', 'client_secret', scope='scope') + "client_id", "client_secret", scope="scope" + ) oauth_cb() mock_session_class.return_value.fetch_token.assert_called_once_with( - "token_endpoint", grant_type="client_credentials") + "token_endpoint", grant_type="client_credentials" + )