Skip to content

Commit d1d0d23

Browse files
authored
Merge pull request #178 from Sync topic reader
2 parents 71da358 + 2249c6f commit d1d0d23

File tree

6 files changed

+274
-192
lines changed

6 files changed

+274
-192
lines changed

tests/topics/test_topic_reader.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@
22

33

44
@pytest.mark.asyncio
5-
class TestTopicWriterAsyncIO:
5+
class TestTopicReaderAsyncIO:
66
async def test_read_message(
77
self, driver, topic_path, topic_with_messages, topic_consumer
88
):
99
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
1010

1111
assert await reader.receive_batch() is not None
1212
await reader.close()
13+
14+
15+
class TestTopicReaderSync:
16+
def test_read_message(
17+
self, driver_sync, topic_path, topic_with_messages, topic_consumer
18+
):
19+
reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path)
20+
21+
assert reader.receive_batch() is not None
22+
reader.close()

ydb/_topic_common/common.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
import asyncio
2+
import concurrent.futures
3+
import threading
14
import typing
5+
from typing import Optional
26

37
from .. import operation, issues
48
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
@@ -24,3 +28,36 @@ def wrapper(rpc_state, response_pb, driver=None):
2428
return result_type.from_proto(msg)
2529

2630
return wrapper
31+
32+
33+
_shared_event_loop_lock = threading.Lock()
34+
_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop]
35+
36+
37+
def _get_shared_event_loop() -> asyncio.AbstractEventLoop:
38+
global _shared_event_loop
39+
40+
if _shared_event_loop is not None:
41+
return _shared_event_loop
42+
43+
with _shared_event_loop_lock:
44+
if _shared_event_loop is not None:
45+
return _shared_event_loop
46+
47+
event_loop_set_done = concurrent.futures.Future()
48+
49+
def start_event_loop():
50+
event_loop = asyncio.new_event_loop()
51+
event_loop_set_done.set_result(event_loop)
52+
asyncio.set_event_loop(event_loop)
53+
event_loop.run_forever()
54+
55+
t = threading.Thread(
56+
target=start_event_loop,
57+
name="Common ydb topic event loop",
58+
daemon=True,
59+
)
60+
t.start()
61+
62+
_shared_event_loop = event_loop_set_done.result()
63+
return _shared_event_loop

ydb/_topic_reader/topic_reader.py

Lines changed: 0 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1-
import concurrent.futures
21
import enum
32
import datetime
43
from dataclasses import dataclass
54
from typing import (
65
Union,
76
Optional,
87
List,
9-
Iterable,
108
)
119

1210
from ..table import RetrySettings
13-
from .datatypes import ICommittable, PublicBatch, PublicMessage
1411
from .._topic_common.common import TokenGetterFuncType
1512
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1613

@@ -26,130 +23,6 @@ def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
2623
self.partitions = partitions
2724

2825

29-
class Reader(object):
30-
def async_sessions_stat(self) -> concurrent.futures.Future:
31-
"""
32-
Receive stat from the server, return feature.
33-
"""
34-
raise NotImplementedError()
35-
36-
async def sessions_stat(self) -> List["SessionStat"]:
37-
"""
38-
Receive stat from the server
39-
40-
use async_sessions_stat for set explicit wait timeout
41-
"""
42-
raise NotImplementedError()
43-
44-
def messages(
45-
self, *, timeout: Union[float, None] = None
46-
) -> Iterable[PublicMessage]:
47-
"""
48-
todo?
49-
50-
Block until receive new message
51-
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
52-
53-
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
54-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
55-
"""
56-
raise NotImplementedError()
57-
58-
def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage:
59-
"""
60-
Block until receive new message
61-
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
62-
63-
if no new message in timeout seconds (default - infinite): raise TimeoutError()
64-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
65-
"""
66-
raise NotImplementedError()
67-
68-
def async_wait_message(self) -> concurrent.futures.Future:
69-
"""
70-
Return future, which will completed when the reader has least one message in queue.
71-
If reader already has message - future will return completed.
72-
73-
Possible situation when receive signal about message available, but no messages when try to receive a message.
74-
If message expired between send event and try to retrieve message (for example connection broken).
75-
"""
76-
raise NotImplementedError()
77-
78-
def batches(
79-
self,
80-
*,
81-
max_messages: Union[int, None] = None,
82-
max_bytes: Union[int, None] = None,
83-
timeout: Union[float, None] = None,
84-
) -> Iterable[PublicBatch]:
85-
"""
86-
Block until receive new batch.
87-
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
88-
89-
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
90-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
91-
"""
92-
raise NotImplementedError()
93-
94-
def receive_batch(
95-
self,
96-
*,
97-
max_messages: Union[int, None] = None,
98-
max_bytes: Union[int, None],
99-
timeout: Union[float, None] = None,
100-
) -> Union[PublicBatch, None]:
101-
"""
102-
Get one messages batch from reader
103-
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
104-
105-
if no new message in timeout seconds (default - infinite): raise TimeoutError()
106-
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
107-
"""
108-
raise NotImplementedError()
109-
110-
def commit(self, mess: ICommittable):
111-
"""
112-
Put commit message to internal buffer.
113-
114-
For the method no way check the commit result
115-
(for example if lost connection - commits will not re-send and committed messages will receive again)
116-
"""
117-
raise NotImplementedError()
118-
119-
def commit_with_ack(
120-
self, mess: ICommittable
121-
) -> Union["CommitResult", List["CommitResult"]]:
122-
"""
123-
write commit message to a buffer and wait ack from the server.
124-
125-
if receive in timeout seconds (default - infinite): raise TimeoutError()
126-
"""
127-
raise NotImplementedError()
128-
129-
def async_commit_with_ack(
130-
self, mess: ICommittable
131-
) -> Union["CommitResult", List["CommitResult"]]:
132-
"""
133-
write commit message to a buffer and return Future for wait result.
134-
"""
135-
raise NotImplementedError()
136-
137-
def async_flush(self) -> concurrent.futures.Future:
138-
"""
139-
force send all commit messages from internal buffers to server and return Future for wait server acks.
140-
"""
141-
raise NotImplementedError()
142-
143-
def flush(self):
144-
"""
145-
force send all commit messages from internal buffers to server and wait acks for all of them.
146-
"""
147-
raise NotImplementedError()
148-
149-
def close(self):
150-
raise NotImplementedError()
151-
152-
15326
@dataclass
15427
class PublicReaderSettings:
15528
consumer: str

0 commit comments

Comments
 (0)