From b27dc6a673575b57d6c9a6a745710fb945ea7314 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Wed, 12 Jun 2024 16:38:52 -0400 Subject: [PATCH] Expose the capability from adc-streaming 2.4.0 to publish to multiple topics --- doc/user/stream.rst | 25 ++++++++++++++++++--- hop/io.py | 29 +++++++++++++++---------- setup.py | 2 +- tests/conftest.py | 9 ++++++-- tests/test_io.py | 53 +++++++++++++++++++++++++++++++++++---------- 5 files changed, 89 insertions(+), 29 deletions(-) diff --git a/doc/user/stream.rst b/doc/user/stream.rst index 65059a3..7371f2c 100755 --- a/doc/user/stream.rst +++ b/doc/user/stream.rst @@ -82,13 +82,32 @@ Kafka topics, and takes the form: .. code:: bash - kafka://[username@]broker/topic[,topic2[,...]] + kafka://[username@]broker/[topic[,topic2[,...]]] The broker takes the form :code:`hostname[:port]` and gives the URL to connect to a Kafka broker. Optionally, a :code:`username` can be provided, which is used to select among available credentials to use when communicating with the broker. -Finally, one can publish to a topic or subscribe to one or more topics to consume messages -from. +Finally, one can specify a number of topics to which to publish or subscribe. + +Publishing to Multiple Topics +------------------------------- + +A single stream object can be used to publish to multiple topics, and doing so uses resources +more efficiently by spawning fewer worker threads, opening fewer sockets, etc., than opening a +separate stream for each of several topics, but requires attention to one extra detail: When a +stream is opened for multiple topics, the topic must be specified when calling :code:`write()`, +in order to make unambiguous to which topic that particular message should be published: + +.. code:: python + + from hop import stream + + with stream.open("kafka://hostname:port/topic1,topic2", "w") as s: + s.write({"my": "message"}, topic="topic2") + +In fact, when opening a stream for writing, it is not necessary for the target URL to contain +a topic at all; if it does not, the topic to which to publish must always be specified when +calling :code:`write()`. Committing Messages Manually ------------------------------ diff --git a/hop/io.py b/hop/io.py index d15eacb..0490da3 100644 --- a/hop/io.py +++ b/hop/io.py @@ -88,8 +88,8 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs): in write mode or a :class:`Consumer` instance in read mode. Raises: - ValueError: If the mode is not set to read/write, if more than - one topic is specified in write mode, or if more than one broker is specified + ValueError: If the mode is not set to read/write, if no topic + is specified in read mode, or if more than one broker is specified """ username, broker_addresses, topics = kafka.parse_kafka_url(url) @@ -98,21 +98,20 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs): logger.debug("connecting to addresses=%s username=%s topics=%s", broker_addresses, group_id, topics) - if topics is None: - raise ValueError("no topic(s) specified in kafka URL") - if self.auth is not None: credential = select_matching_auth(self.auth, broker_addresses[0], username) else: credential = None if mode == "w": - if len(topics) != 1: - raise ValueError("must specify exactly one topic in write mode") + if topics is None or len(topics) != 1: + topics = [None] if group_id is not None: warnings.warn("group ID has no effect when opening a stream in write mode") return Producer(broker_addresses, topics[0], auth=credential, **kwargs) elif mode == "r": + if topics is None or len(topics) == 0: + raise ValueError("no topic(s) specified in kafka URL") if group_id is None: username = credential.username if credential is not None else None group_id = _generate_group_id(username, 10) @@ -220,7 +219,7 @@ def from_format(data, format, deserialize=True): return models.JSONBlob(content=old) # if we can't tell what the data is, pass it on unchanged except (UnicodeDecodeError, json.JSONDecodeError): - logger.warning("Unknown message format; returning a Blob") + logger.info("Unknown message format; returning a Blob") return models.Blob(content=message.value()) def load(self, input_): @@ -470,7 +469,7 @@ def __init__(self, broker_addresses, topic, auth, **kwargs): logger.info(f"publishing to topic: {topic}") def write(self, message, headers=None, - delivery_callback=errors.raise_delivery_errors, test=False): + delivery_callback=errors.raise_delivery_errors, test=False, topic=None): """Write messages to a stream. @@ -484,12 +483,15 @@ def write(self, message, headers=None, is either delivered or permenantly fails to be delivered. test: Message should be marked as a test message by adding a header with key '_test'. + topic: The topic to which the message should be sent. This need not be specified if + the stream was opened with a URL containing exactly one topic name. """ message, headers = self._pack(message, headers, test=test) - self._producer.write(message, headers=headers, delivery_callback=delivery_callback) + self._producer.write(message, headers=headers, delivery_callback=delivery_callback, + topic=topic) def write_raw(self, packed_message, headers=None, - delivery_callback=errors.raise_delivery_errors): + delivery_callback=errors.raise_delivery_errors, topic=None): """Write a pre-encoded message to the stream. This is an advanced interface; for most purposes it is preferrable to use @@ -502,9 +504,12 @@ def write_raw(self, packed_message, headers=None, mapping strings to strings, or as a list of 2-tuples of strings. delivery_callback: A callback which will be called when each message is either delivered or permenantly fails to be delivered. + topic: The topic to which the message should be sent. This need not be specified if + the stream was opened with a URL containing exactly one topic name. """ - self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback) + self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback, + topic=topic) @staticmethod def pack(message, headers=None, test=False, auth=None): diff --git a/setup.py b/setup.py index 62e02b1..a436135 100755 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ # requirements install_requires = [ - "adc-streaming >= 2.2.0", + "adc-streaming >= 2.4.0", "dataclasses ; python_version < '3.7'", "fastavro >= 1.4.0", "pluggy >= 0.11", diff --git a/tests/conftest.py b/tests/conftest.py index b77ef46..e2e523d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -367,8 +367,13 @@ def __init__(self, broker, topic): self.broker = broker self.topic = topic - def write(self, msg, headers=[], delivery_callback=None): - self.broker.write(self.topic, msg, headers) + def write(self, msg, headers=[], delivery_callback=None, topic=None): + if topic is None: + if self.topic is not None: + topic = self.topic + else: + raise Exception("No topic specified for write") + self.broker.write(topic, msg, headers) def close(self): pass diff --git a/tests/test_io.py b/tests/test_io.py index ca5e659..8270872 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -281,9 +281,7 @@ def test_stream_stop(circular_msg): def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer): topic = "gcn" - mock_adc_producer = mock_producer(mock_broker, topic) expected_msg = make_message_standard(circular_msg) - fixed_uuid = uuid4() auth = Auth("user", "password") @@ -298,7 +296,8 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer): none_test_headers = [("_id", fixed_uuid.bytes), ("_sender", auth.username.encode("utf-8")), ('_test', b"true"), ("_format", b"circular")] - with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer), \ + mb = mock_broker + with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \ patch("hop.io.uuid4", MagicMock(return_value=fixed_uuid)): broker_url = f"kafka://localhost:port/{topic}" @@ -307,10 +306,6 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer): stream = io.Stream(start_at=start_at, until_eos=until_eos, auth=auth) - # verify only 1 topic is allowed in write mode - with pytest.raises(ValueError): - stream.open("kafka://localhost:9092/topic1,topic2", "w") - # verify warning is raised when groupid is set in write mode with pytest.warns(UserWarning): stream.open("kafka://localhost:9092/topic1", "w", group_id="group") @@ -337,14 +332,40 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer): s.close() assert mock_broker.has_message(topic, expected_msg.value(), canonical_headers) + mock_broker.reset() + # more than one topics should now be allowed in write mode + with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s: + with pytest.raises(Exception): + # however, a topic must be specified when calling write with multiple topics + # specified on construction + s.write(circular_msg, headers) + + # selecting a topic explicitly when calling write should work + s.write(circular_msg, headers, topic="topic1") + assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers) + s.write(circular_msg, headers, topic="topic2") + assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers) + + mock_broker.reset() + # no topic can also be specified in write mode + with stream.open("kafka://localhost:9092/", "w") as s: + with pytest.raises(Exception): + # however, a topic must be specified when calling write + s.write(circular_msg, headers) + + s.write(circular_msg, headers, topic="topic1") + assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers) + s.write(circular_msg, headers, topic="topic2") + assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers) + def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_producer): topic = "gcn" - mock_adc_producer = mock_producer(mock_broker, topic) encoded_msg = io.Producer.pack(circular_msg) headers = {"some header": "some value"} canonical_headers = list(headers.items()) - with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer): + mb = mock_broker + with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)): stream = io.Stream(auth=False) broker_url = f"kafka://localhost:9092/{topic}" @@ -361,6 +382,14 @@ def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_produce s.close() assert mock_broker.has_message(topic, encoded_msg, canonical_headers) + with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s: + with pytest.raises(Exception): + s.write_raw(encoded_msg, canonical_headers) + s.write_raw(encoded_msg, canonical_headers, topic="topic1") + assert mock_broker.has_message("topic1", encoded_msg, canonical_headers) + s.write_raw(encoded_msg, canonical_headers, topic="topic2") + assert mock_broker.has_message("topic2", encoded_msg, canonical_headers) + def test_stream_auth(auth_config, tmpdir): # turning off authentication should give None for the auth property @@ -385,7 +414,7 @@ def test_stream_auth(auth_config, tmpdir): assert s4.auth == "blarg" -def test_stream_open(auth_config, tmpdir): +def test_stream_open(auth_config, mock_broker, mock_producer, tmpdir): stream = io.Stream(auth=False) # verify only read/writes are allowed @@ -398,7 +427,7 @@ def test_stream_open(auth_config, tmpdir): stream.open("bad://example.com/topic", "r") assert "invalid kafka URL: must start with 'kafka://'" in err.value.args - # verify that URLs with no topic are rejected + # verify that URLs with no topic are rejected when reading with pytest.raises(ValueError) as err: stream.open("kafka://example.com/", "r") assert "no topic(s) specified in kafka URL" in err.value.args @@ -409,7 +438,9 @@ def test_stream_open(auth_config, tmpdir): assert "Multiple broker addresses are not supported" in err.value.args # verify that complete URLs are accepted + mb = mock_broker with temp_config(tmpdir, auth_config) as config_dir, temp_environ(XDG_CONFIG_HOME=config_dir), \ + patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \ patch("adc.consumer.Consumer.subscribe", MagicMock()) as subscribe: stream = io.Stream() # opening a valid URL for reading should succeed