Skip to content

Commit 12dd38e

Browse files
authored
Merge pull request #124 First iteration of topic reader
2 parents dde12db + abac878 commit 12dd38e

14 files changed

+1908
-296
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Initial implementation of topic reader
2+
13
## 3.0.1b3 ##
24
* Fix error of check retriable error for idempotent operations (error exist since 2.12.1)
35

tests/conftest.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,17 @@ async def driver(endpoint, database, event_loop):
100100

101101

102102
@pytest.fixture()
103-
def topic_path(endpoint) -> str:
103+
def topic_consumer():
104+
return "fixture-consumer"
105+
106+
107+
@pytest.fixture()
108+
def topic_path(endpoint, topic_consumer) -> str:
104109
subprocess.run(
105110
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic"""
106111
% endpoint,
107112
shell=True,
113+
capture_output=True,
108114
)
109115
res = subprocess.run(
110116
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic"""
@@ -114,4 +120,26 @@ def topic_path(endpoint) -> str:
114120
)
115121
assert res.returncode == 0, res.stderr + res.stdout
116122

123+
res = subprocess.run(
124+
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic consumer add --consumer %s /local/test-topic"""
125+
% (endpoint, topic_consumer),
126+
shell=True,
127+
capture_output=True,
128+
)
129+
assert res.returncode == 0, res.stderr + res.stdout
130+
117131
return "/local/test-topic"
132+
133+
134+
@pytest.fixture()
135+
@pytest.mark.asyncio()
136+
async def topic_with_messages(driver, topic_path):
137+
pass
138+
writer = driver.topic_client.topic_writer(
139+
topic_path, producer_and_message_group_id="fixture-producer-id"
140+
)
141+
await writer.write_with_ack(
142+
ydb.TopicWriterMessage(data="123".encode()),
143+
ydb.TopicWriterMessage(data="456".encode()),
144+
)
145+
await writer.close()

tests/topics/test_topic_reader.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import pytest
2+
3+
4+
@pytest.mark.asyncio
5+
class TestTopicWriterAsyncIO:
6+
async def test_read_message(
7+
self, driver, topic_path, topic_with_messages, topic_consumer
8+
):
9+
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
10+
11+
assert await reader.receive_batch() is not None

ydb/_topic_reader/datatypes.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import abc
2+
import enum
3+
from dataclasses import dataclass
4+
import datetime
5+
from typing import Mapping, Union, Any, List, Dict
6+
7+
8+
class ICommittable(abc.ABC):
9+
@property
10+
@abc.abstractmethod
11+
def start_offset(self) -> int:
12+
pass
13+
14+
@property
15+
@abc.abstractmethod
16+
def end_offset(self) -> int:
17+
pass
18+
19+
20+
class ISessionAlive(abc.ABC):
21+
@property
22+
@abc.abstractmethod
23+
def is_alive(self) -> bool:
24+
pass
25+
26+
27+
@dataclass
28+
class PublicMessage(ICommittable, ISessionAlive):
29+
seqno: int
30+
created_at: datetime.datetime
31+
message_group_id: str
32+
session_metadata: Dict[str, str]
33+
offset: int
34+
written_at: datetime.datetime
35+
producer_id: str
36+
data: Union[
37+
bytes, Any
38+
] # set as original decompressed bytes or deserialized object if deserializer set in reader
39+
_partition_session: "PartitionSession"
40+
41+
@property
42+
def start_offset(self) -> int:
43+
raise NotImplementedError()
44+
45+
@property
46+
def end_offset(self) -> int:
47+
raise NotImplementedError()
48+
49+
# ISessionAlive implementation
50+
@property
51+
def is_alive(self) -> bool:
52+
raise NotImplementedError()
53+
54+
55+
@dataclass
56+
class PartitionSession:
57+
id: int
58+
state: "PartitionSession.State"
59+
topic_path: str
60+
partition_id: int
61+
62+
def stop(self):
63+
self.state = PartitionSession.State.Stopped
64+
65+
class State(enum.Enum):
66+
Active = 1
67+
GracefulShutdown = 2
68+
Stopped = 3
69+
70+
71+
@dataclass
72+
class PublicBatch(ICommittable, ISessionAlive):
73+
session_metadata: Mapping[str, str]
74+
messages: List[PublicMessage]
75+
_partition_session: PartitionSession
76+
_bytes_size: int
77+
78+
@property
79+
def start_offset(self) -> int:
80+
raise NotImplementedError()
81+
82+
@property
83+
def end_offset(self) -> int:
84+
raise NotImplementedError()
85+
86+
# ISessionAlive implementation
87+
@property
88+
def is_alive(self) -> bool:
89+
state = self._partition_session.state
90+
return (
91+
state == PartitionSession.State.Active
92+
or state == PartitionSession.State.GracefulShutdown
93+
)

0 commit comments

Comments
 (0)