Skip to content

Commit bc84294

Browse files
committed
[FLINK-38560][python] Support ordered mode for async function in Python DataStream API
This closes #27170.
1 parent 19d7580 commit bc84294

File tree

4 files changed

+166
-8
lines changed

4 files changed

+166
-8
lines changed

flink-python/pyflink/datastream/async_data_stream.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,65 @@ def unordered_wait_with_retry(
8989
j_output_type_info,
9090
j_python_data_stream_function_operator))
9191

92+
@staticmethod
93+
def ordered_wait(
94+
data_stream: DataStream,
95+
async_function: AsyncFunction,
96+
timeout: Time,
97+
capacity: int = 100,
98+
output_type: TypeInformation = None) -> 'DataStream':
99+
"""
100+
Adds an async function to the data stream. The order to process input records
101+
is guaranteed to be the same as input ones.
102+
103+
:param data_stream: The input data stream.
104+
:param async_function: The async function.
105+
:param timeout: The timeout for the asynchronous operation to complete.
106+
:param capacity: The max number of async i/o operation that can be triggered.
107+
:param output_type: The output data type.
108+
:return: The transformed DataStream.
109+
"""
110+
return AsyncDataStream.ordered_wait_with_retry(
111+
data_stream, async_function, timeout, async_retry_strategies.NO_RETRY_STRATEGY,
112+
capacity, output_type)
113+
114+
@staticmethod
115+
def ordered_wait_with_retry(
116+
data_stream: DataStream,
117+
async_function: AsyncFunction,
118+
timeout: Time,
119+
async_retry_strategy: AsyncRetryStrategy,
120+
capacity: int = 100,
121+
output_type: TypeInformation = None) -> 'DataStream':
122+
"""
123+
Adds an async function with an AsyncRetryStrategy to support retry of AsyncFunction to the
124+
data stream. The order to process input records is guaranteed to be the same as input ones.
125+
126+
:param data_stream: The input data stream.
127+
:param async_function: The async function.
128+
:param timeout: The timeout for the asynchronous operation to complete.
129+
:param async_retry_strategy: The strategy of reattempt async i/o operation that can be
130+
triggered
131+
:param capacity: The max number of async i/o operation that can be triggered.
132+
:param output_type: The output data type.
133+
:return: The transformed DataStream.
134+
"""
135+
AsyncDataStream._validate(data_stream, async_function, timeout, async_retry_strategy)
136+
137+
from pyflink.fn_execution import flink_fn_execution_pb2
138+
j_python_data_stream_function_operator, j_output_type_info = \
139+
_get_one_input_stream_operator(
140+
data_stream,
141+
AsyncFunctionDescriptor(
142+
async_function, timeout, capacity, async_retry_strategy,
143+
AsyncFunctionDescriptor.OutputMode.ORDERED),
144+
flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS, # type: ignore
145+
output_type)
146+
return DataStream(data_stream._j_data_stream.transform(
147+
"async wait operator",
148+
j_output_type_info,
149+
j_python_data_stream_function_operator))
150+
92151
@staticmethod
93152
def _validate(data_stream: DataStream, async_function: AsyncFunction,
94153
timeout: Time, async_retry_strategy: AsyncRetryStrategy) -> None:

flink-python/pyflink/datastream/tests/test_async_function.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def assert_equals_sorted(self, expected, actual):
4545
def assert_equals(self, expected, actual):
4646
self.assertEqual(expected, actual)
4747

48-
def test_basic_functionality(self):
48+
def test_unordered_mode(self):
4949
self.env.set_parallelism(1)
5050
ds = self.env.from_collection(
5151
[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
@@ -69,6 +69,30 @@ def timeout(self, value: Row, result_future: ResultFuture[int]):
6969
expected = ['2', '4', '6', '8', '10']
7070
self.assert_equals_sorted(expected, results)
7171

72+
def test_ordered_mode(self):
73+
self.env.set_parallelism(1)
74+
ds = self.env.from_collection(
75+
[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
76+
type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()])
77+
)
78+
79+
class MyAsyncFunction(AsyncFunction):
80+
81+
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
82+
await asyncio.sleep(random.randint(1, 2))
83+
result_future.complete([value[0] + value[1]])
84+
85+
def timeout(self, value: Row, result_future: ResultFuture[int]):
86+
result_future.complete([value[0] + value[1]])
87+
88+
ds = AsyncDataStream.ordered_wait(
89+
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
90+
ds.add_sink(self.test_sink)
91+
self.env.execute()
92+
results = self.test_sink.get_results(False)
93+
expected = ['2', '4', '6', '8', '10']
94+
self.assert_equals(expected, results)
95+
7296
def test_watermark(self):
7397
self.env.set_parallelism(1)
7498
ds = self.env.from_collection(

flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from pyflink.datastream import RuntimeContext, ResultFuture
2525
from pyflink.datastream.functions import AsyncFunctionDescriptor, AsyncRetryStrategy
2626
from pyflink.fn_execution.datastream.process.async_function.queue import \
27-
UnorderedStreamElementQueue, StreamElementQueue
27+
UnorderedStreamElementQueue, StreamElementQueue, OrderedStreamElementQueue
2828
from pyflink.fn_execution.datastream.process.operations import Operation
2929
from pyflink.fn_execution.datastream.process.runtime_context import StreamingRuntimeContext
3030

@@ -291,7 +291,7 @@ def __init__(self, serialized_fn, operator_state_backend):
291291
if output_mode == AsyncFunctionDescriptor.OutputMode.UNORDERED:
292292
self._queue = UnorderedStreamElementQueue(capacity, self._raise_exception_if_exists)
293293
else:
294-
raise NotImplementedError("ORDERED mode is still not supported.")
294+
self._queue = OrderedStreamElementQueue(capacity, self._raise_exception_if_exists)
295295
self._emitter = None
296296
self._async_function_runner = None
297297
self._exception = None

flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
################################################################################
1818
import collections
1919
import threading
20-
from abc import ABC
20+
from abc import ABC, abstractmethod
2121
from typing import Generic, TypeVar, List
2222

2323
from pyflink.datastream import ResultFuture
@@ -34,12 +34,14 @@ class StreamElementQueueEntry(ABC, ResultFuture, Generic[OUT]):
3434
allows to set the result of a completed entry through ResultFuture.
3535
"""
3636

37+
@abstractmethod
3738
def is_done(self) -> bool:
3839
"""
3940
True if the stream element queue entry has been completed; otherwise false.
4041
"""
4142
pass
4243

44+
@abstractmethod
4345
def emit_result(self, output_processor) -> int:
4446
"""
4547
Emits the results associated with this queue entry.
@@ -109,6 +111,7 @@ def complete(self, result: List[OUT]):
109111

110112
class StreamElementQueue(ABC, Generic[OUT]):
111113

114+
@abstractmethod
112115
def put(self, windowed_value, timestamp, watermark, record) -> ResultFuture[OUT]:
113116
"""
114117
Put the given record in the queue. This operation blocks until the queue has
@@ -125,6 +128,7 @@ def put(self, windowed_value, timestamp, watermark, record) -> ResultFuture[OUT]
125128
"""
126129
pass
127130

131+
@abstractmethod
128132
def advance_watermark(self, watermark):
129133
"""
130134
Tries to put the given watermark in the queue. This operation succeeds if the queue has
@@ -134,6 +138,7 @@ def advance_watermark(self, watermark):
134138
"""
135139
pass
136140

141+
@abstractmethod
137142
def emit_completed_element(self, output_processor):
138143
"""
139144
Emits one completed element from the head of this queue into the given output.
@@ -142,30 +147,35 @@ def emit_completed_element(self, output_processor):
142147
"""
143148
pass
144149

150+
@abstractmethod
145151
def has_completed_elements(self) -> bool:
146152
"""
147153
Checks if there is at least one completed head element.
148154
"""
149155
pass
150156

157+
@abstractmethod
151158
def wait_for_completed_elements(self):
152159
"""
153160
Waits until there is completed elements.
154161
"""
155162
pass
156163

157-
def wait_for_in_flight_elements_processed(self):
164+
@abstractmethod
165+
def wait_for_in_flight_elements_processed(self, timeout=1):
158166
"""
159167
Waits until any inflight elements have been processed.
160168
"""
161169
pass
162170

171+
@abstractmethod
163172
def is_empty(self) -> bool:
164173
"""
165174
True if the queue is empty; otherwise false.
166175
"""
167176
pass
168177

178+
@abstractmethod
169179
def size(self) -> int:
170180
"""
171181
Return the size of the queue.
@@ -265,9 +275,10 @@ def put(self, windowed_value, timestamp, watermark, record) -> ResultFuture[OUT]
265275
return entry
266276

267277
def advance_watermark(self, watermark):
268-
with self._lock:
269-
if watermark > self._current_watermark:
270-
self._current_watermark = watermark
278+
if watermark > self._current_watermark:
279+
self._current_watermark = watermark
280+
281+
with self._lock:
271282
self._add_watermark(watermark)
272283

273284
def emit_completed_element(self, output_processor):
@@ -343,3 +354,67 @@ def _add_segment(self, capacity) -> 'UnorderedStreamElementQueue.Segment':
343354
new_segment = UnorderedStreamElementQueue.Segment(capacity)
344355
self._segments.append(new_segment)
345356
return new_segment
357+
358+
359+
class OrderedStreamElementQueue(StreamElementQueue):
360+
361+
def __init__(self, capacity: int, exception_checker):
362+
self._capacity = capacity
363+
self._exception_checker = exception_checker
364+
self._queue = collections.deque()
365+
self._lock = threading.RLock()
366+
self._not_full = threading.Condition(self._lock)
367+
self._not_empty = threading.Condition(self._lock)
368+
self._number_of_pending_entries = 0
369+
370+
def put(self, windowed_value, timestamp, watermark, record) -> ResultFuture[OUT]:
371+
with self._not_full:
372+
while self.size() >= self._capacity:
373+
self._not_full.wait(1)
374+
self._exception_checker()
375+
376+
entry = StreamRecordQueueEntry(windowed_value, timestamp, watermark, record)
377+
entry.on_complete(self.on_complete_handler)
378+
self._queue.append(entry)
379+
self._number_of_pending_entries += 1
380+
return entry
381+
382+
def advance_watermark(self, watermark):
383+
# do nothing in ordered mode
384+
pass
385+
386+
def emit_completed_element(self, output_processor):
387+
with self._not_full:
388+
if not self.has_completed_elements():
389+
return
390+
391+
self._queue.popleft().emit_result(output_processor)
392+
self._number_of_pending_entries -= 1
393+
self._not_full.notify_all()
394+
395+
def has_completed_elements(self) -> bool:
396+
with self._lock:
397+
return len(self._queue) > 0 and self._queue[0].is_done()
398+
399+
def wait_for_completed_elements(self):
400+
with self._not_empty:
401+
while not self.has_completed_elements():
402+
self._not_empty.wait()
403+
404+
def wait_for_in_flight_elements_processed(self, timeout=1):
405+
with self._not_full:
406+
if self._number_of_pending_entries != 0:
407+
self._not_full.wait(timeout)
408+
409+
def is_empty(self) -> bool:
410+
with self._lock:
411+
return self._number_of_pending_entries == 0
412+
413+
def size(self) -> int:
414+
with self._lock:
415+
return self._number_of_pending_entries
416+
417+
def on_complete_handler(self, entry):
418+
with self._not_empty:
419+
if self.has_completed_elements():
420+
self._not_empty.notify()

0 commit comments

Comments
 (0)