Description
Bug Report
YDB Python SDK version:
3.18.2
Environment
Ubuntu 22.04.4, python3.7
Current behavior:
When writing multiple batches with the same producer_id into a topic with await asyncio.wait_for(writer.write(batch))
, after an explicit flush() call a random amount of elements from batches is written into the topic. Bug doesn't manifest if all the batches have different producer_id or writing is called without asyncio.wait_for
.
Expected behavior:
No difference in behavior when calling writer.write()
with or without wrapping it with asyncio.wait_for
, regardless of producer_id.
Steps to reproduce:
- Write a batch into a topic while wrapping the write call with
asyncio.wait_for
(e.g.await asyncio.wait_for(writer.write(batch))
) - Write a second batch in the same manner with the same producer_id.
- Call
writer.flush()
after writing a second batch. - In the same session, read from topic.
Expected: reader gets both entire batches
Actual: reader gets a random amount of data from batches. Usually it's the entire first batch and none of the second, but sometimes it can be just part of the first or entire first and part of the second.
Related code:
async def test_flushing(local_ydb):
driver_config = ydb.DriverConfig(
local_ydb.endpoint,
local_ydb.database,
)
async with ydb.aio.Driver(driver_config) as driver:
try:
await driver.wait(timeout=10)
except TimeoutError:
logger.error('[YDB] Failed to connect to local YDB instance')
logger.error('Last reported errors by discovery:')
logger.error(driver.discovery_debug_details())
raise
client = driver.topic_client
topic_name = 'test_topic'
consumers = [ydb.TopicConsumer('consumer1', important=True), ydb.TopicConsumer('consumer2', important=True)]
await client.create_topic(topic_name, consumers=consumers)
writer = client.writer(topic_name, producer_id='some_rule')
messages = [
ydb.TopicWriterMessage(
data=json.dumps({'key': f'value_{i}', 'payload': f'payload_{i}'}),
) for i in range(10)
]
await asyncio.wait_for(writer.write(messages), timeout=10)
messages = [
ydb.TopicWriterMessage(
data=json.dumps({'key': f'value_{i}', 'payload': f'payload_{i}'}),
) for i in range(10, 20)
]
await asyncio.wait_for(writer.write(messages), timeout=10)
await writer.flush()
reader = client.reader(topic_name, 'consumer1')
batch = await reader.receive_batch()
assert len(batch.messages) == 20
await writer.close()
assert always fails.
If await asyncio.wait_for(writer.write(messages), timeout=10)
is replaced with await writer.write(messages)
- assert doesn't fail.
If producer_id is different in two batches - assert doesn't fail as well, even with asyncio.wait_for
.
Other information:
Got a suspicion that the issue might be caused by SDK spinning up a separate event loop internally and unexpected asyncio.wait_for
interactions with it.