Skip to content

Commit 3fbf8f5

Browse files
committed
[FLINK-38615][python] Optimize AsyncFunction to remove ResultFuture
1 parent c3bf061 commit 3fbf8f5

File tree

5 files changed

+95
-97
lines changed

5 files changed

+95
-97
lines changed

flink-python/pyflink/datastream/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@
272272
KeyedCoProcessFunction, AggregateFunction, WindowFunction,
273273
ProcessWindowFunction, BroadcastProcessFunction,
274274
KeyedBroadcastProcessFunction, AsyncFunction,
275-
ResultFuture, AsyncRetryPredicate, AsyncRetryStrategy)
275+
AsyncRetryPredicate, AsyncRetryStrategy)
276276
from pyflink.datastream.slot_sharing_group import SlotSharingGroup, MemorySize
277277
from pyflink.datastream.state_backend import (StateBackend, CustomStateBackend,
278278
PredefinedOptions, HashMapStateBackend,
@@ -345,7 +345,6 @@
345345
'SlotSharingGroup',
346346
'MemorySize',
347347
'OutputTag',
348-
'ResultFuture',
349348
'AsyncRetryPredicate',
350349
'AsyncRetryStrategy'
351350
]

flink-python/pyflink/datastream/functions.py

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
'KeyedBroadcastProcessFunction',
5757
'AsyncFunction',
5858
'AsyncFunctionDescriptor',
59-
'ResultFuture',
6059
'AsyncRetryPredicate',
6160
'AsyncRetryStrategy',
6261
]
@@ -979,68 +978,32 @@ def exponential_backoff(
979978
result_predicate, exception_predicate)
980979

981980

982-
class ResultFuture(Generic[OUT]):
983-
"""
984-
Collects data / error in user codes while processing async i/o.
985-
"""
986-
987-
@abstractmethod
988-
def complete(self, result: List[OUT]):
989-
"""
990-
Completes the result future with a collection of result objects.
991-
992-
Note that it should be called for exactly one time in the user code. Calling this function
993-
for multiple times will cause data lose.
994-
995-
Put all results in a collection and then emit output.
996-
997-
:param result: A list of results.
998-
"""
999-
pass
1000-
1001-
@abstractmethod
1002-
def complete_exceptionally(self, error: Exception):
1003-
"""
1004-
Completes the result future exceptionally with an exception.
1005-
1006-
:param error: An Exception object.
1007-
"""
1008-
pass
1009-
1010-
1011981
class AsyncFunction(Function, Generic[IN, OUT]):
1012982
"""
1013983
A function to trigger Async I/O operation.
1014984
1015-
For each #async_invoke, an async io operation can be triggered, and once it has been done, the
1016-
result can be collected by calling :func:`~ResultFuture.complete`. For each async operation, its
985+
For each #async_invoke, an async io operation can be triggered. For each async operation, its
1017986
context is stored in the operator immediately after invoking #async_invoke, avoiding blocking
1018987
for each stream input as long as the internal buffer is not full.
1019-
1020-
:class:`~ResultFuture` can be passed into callbacks or futures to collect the result data. An
1021-
error can also be propagated to the async IO operator by
1022-
:func:`~ResultFuture.complete_exceptionally`.
1023988
"""
1024989

1025990
@abstractmethod
1026-
async def async_invoke(self, value: IN, result_future: ResultFuture[OUT]):
991+
async def async_invoke(self, value: IN) -> List[OUT]:
1027992
"""
1028993
Trigger async operation for each stream input.
1029994
In case of a user code error. You can raise an exception to make the task fail and
1030995
trigger fail-over process.
1031996
1032997
:param value: Input element coming from an upstream task.
1033-
:param result_future: A future to be completed with the result data.
1034998
"""
1035999
pass
10361000

1037-
def timeout(self, value: IN, result_future: ResultFuture[OUT]):
1001+
def timeout(self, value: IN) -> List[OUT]:
10381002
"""
1039-
In case :func:`~ResultFuture.async_invoke` timeout occurred. By default, the result future
1040-
is exceptionally completed with a timeout exception.
1003+
In case :func:`~ResultFuture.async_invoke` timeout occurred. By default, it raises
1004+
a timeout exception.
10411005
"""
1042-
result_future.complete_exceptionally(
1043-
TimeoutError("Async function call has timed out for input: " + str(value)))
1006+
raise TimeoutError("Async function call has timed out for input: " + str(value))
10441007

10451008

10461009
class AsyncFunctionDescriptor(object):

flink-python/pyflink/datastream/tests/test_async_function.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import List
2121

2222
from pyflink.common import Types, Row, Time, Configuration, WatermarkStrategy
23-
from pyflink.datastream import AsyncDataStream, AsyncFunction, ResultFuture, \
23+
from pyflink.datastream import AsyncDataStream, AsyncFunction, \
2424
StreamExecutionEnvironment, AsyncRetryStrategy
2525
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction, \
2626
SecondColumnTimestampAssigner
@@ -54,12 +54,12 @@ def test_unordered_mode(self):
5454

5555
class MyAsyncFunction(AsyncFunction):
5656

57-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
57+
async def async_invoke(self, value: Row):
5858
await asyncio.sleep(2)
59-
result_future.complete([value[0] + value[1]])
59+
return [value[0] + value[1]]
6060

61-
def timeout(self, value: Row, result_future: ResultFuture[int]):
62-
result_future.complete([value[0] + value[1]])
61+
def timeout(self, value: Row):
62+
return [value[0] + value[1]]
6363

6464
ds = AsyncDataStream.unordered_wait(
6565
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
@@ -78,12 +78,12 @@ def test_ordered_mode(self):
7878

7979
class MyAsyncFunction(AsyncFunction):
8080

81-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
81+
async def async_invoke(self, value: Row):
8282
await asyncio.sleep(random.randint(1, 2))
83-
result_future.complete([value[0] + value[1]])
83+
return [value[0] + value[1]]
8484

85-
def timeout(self, value: Row, result_future: ResultFuture[int]):
86-
result_future.complete([value[0] + value[1]])
85+
def timeout(self, value: Row):
86+
return [value[0] + value[1]]
8787

8888
ds = AsyncDataStream.ordered_wait(
8989
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
@@ -110,12 +110,12 @@ def test_watermark(self):
110110

111111
class MyAsyncFunction(AsyncFunction):
112112

113-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
113+
async def async_invoke(self, value: Row):
114114
await asyncio.sleep(random.randint(1, 3))
115-
result_future.complete([value[0] + value[1]])
115+
return [value[0] + value[1]]
116116

117-
def timeout(self, value: Row, result_future: ResultFuture[int]):
118-
result_future.complete([value[0] + value[1]])
117+
def timeout(self, value: Row):
118+
return [value[0] + value[1]]
119119

120120
ds = AsyncDataStream.unordered_wait(
121121
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
@@ -135,12 +135,12 @@ def test_complete_async_function_with_non_iterable_result(self):
135135

136136
class MyAsyncFunction(AsyncFunction):
137137

138-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
138+
async def async_invoke(self, value: Row):
139139
await asyncio.sleep(2)
140-
result_future.complete(value[0] + value[1])
140+
return value[0] + value[1]
141141

142-
def timeout(self, value: Row, result_future: ResultFuture[int]):
143-
result_future.complete(value[0] + value[1])
142+
def timeout(self, value: Row):
143+
return value[0] + value[1]
144144

145145
ds = AsyncDataStream.unordered_wait(
146146
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
@@ -161,12 +161,12 @@ def test_complete_async_function_with_exception(self):
161161

162162
class MyAsyncFunction(AsyncFunction):
163163

164-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
165-
result_future.complete_exceptionally(Exception("encountered an exception"))
164+
async def async_invoke(self, value: Row):
165+
raise Exception("encountered an exception")
166166

167-
def timeout(self, value: Row, result_future: ResultFuture[int]):
167+
def timeout(self, value: Row):
168168
# raise the same exception to make sure test case is stable in all cases
169-
result_future.complete_exceptionally(Exception("encountered an exception"))
169+
raise Exception("encountered an exception")
170170

171171
ds = AsyncDataStream.unordered_wait(
172172
ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
@@ -186,10 +186,10 @@ def test_raise_exception_in_async_invoke(self):
186186

187187
class MyAsyncFunction(AsyncFunction):
188188

189-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
189+
async def async_invoke(self, value: Row):
190190
raise Exception("encountered an exception")
191191

192-
def timeout(self, value: Row, result_future: ResultFuture[int]):
192+
def timeout(self, value: Row):
193193
# raise the same exception to make sure test case is stable in all cases
194194
raise Exception("encountered an exception")
195195

@@ -211,11 +211,11 @@ def test_raise_exception_in_timeout(self):
211211

212212
class MyAsyncFunction(AsyncFunction):
213213

214-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
214+
async def async_invoke(self, value: Row):
215215
await asyncio.sleep(10)
216-
result_future.complete([value[0] + value[1]])
216+
return [value[0] + value[1]]
217217

218-
def timeout(self, value: Row, result_future: ResultFuture[int]):
218+
def timeout(self, value: Row):
219219
raise Exception("encountered an exception")
220220

221221
ds = AsyncDataStream.unordered_wait(
@@ -236,12 +236,12 @@ def test_processing_timeout(self):
236236

237237
class MyAsyncFunction(AsyncFunction):
238238

239-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
239+
async def async_invoke(self, value: Row):
240240
await asyncio.sleep(10)
241-
result_future.complete([value[0] + value[1]])
241+
return [value[0] + value[1]]
242242

243-
def timeout(self, value: Row, result_future: ResultFuture[int]):
244-
result_future.complete([value[0] - value[1]])
243+
def timeout(self, value: Row):
244+
return [value[0] - value[1]]
245245

246246
ds = AsyncDataStream.unordered_wait(
247247
ds, MyAsyncFunction(), Time.seconds(1), 2, Types.INT())
@@ -264,19 +264,19 @@ def __init__(self):
264264
self.retries_1 = {}
265265
self.retries_2 = {}
266266

267-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
267+
async def async_invoke(self, value: Row):
268268
await asyncio.sleep(1)
269269
if value in self.retries_2:
270-
result_future.complete([value[0] + value[1]])
270+
return [value[0] + value[1]]
271271
elif value in self.retries_1:
272272
self.retries_2[value] = True
273-
result_future.complete([value[0] + value[1] + 1])
273+
return [value[0] + value[1] + 1]
274274
else:
275275
self.retries_1[value] = True
276-
result_future.complete_exceptionally(ValueError("failed the first time"))
276+
raise ValueError("failed the first time")
277277

278-
def timeout(self, value: Row, result_future: ResultFuture[int]):
279-
result_future.complete([value[0] + value[1]])
278+
def timeout(self, value: Row):
279+
return [value[0] + value[1]]
280280

281281
def result_predicate(result: List[int]):
282282
return result[0] % 2 == 1
@@ -312,9 +312,9 @@ def test_run_async_function_in_thread_mode(self):
312312

313313
class MyAsyncFunction(AsyncFunction):
314314

315-
async def async_invoke(self, value: Row, result_future: ResultFuture[int]):
315+
async def async_invoke(self, value: Row):
316316
await asyncio.sleep(2)
317-
result_future.complete([value[0] + value[1]])
317+
return [value[0] + value[1]]
318318

319319
try:
320320
AsyncDataStream.unordered_wait(

flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
from datetime import datetime
2222
from typing import TypeVar, Generic, List, Iterable, Callable, Optional
2323

24-
from pyflink.datastream import RuntimeContext, ResultFuture
24+
from pyflink.datastream import RuntimeContext
2525
from pyflink.datastream.functions import AsyncFunctionDescriptor, AsyncRetryStrategy
2626
from pyflink.fn_execution.datastream.process.async_function.queue import \
27-
UnorderedStreamElementQueue, StreamElementQueue, OrderedStreamElementQueue
27+
UnorderedStreamElementQueue, StreamElementQueue, OrderedStreamElementQueue, ResultFuture
2828
from pyflink.fn_execution.datastream.process.operations import Operation
2929
from pyflink.fn_execution.datastream.process.runtime_context import StreamingRuntimeContext
3030

@@ -63,7 +63,7 @@ class ResultHandler(ResultFuture, Generic[IN, OUT]):
6363

6464
def __init__(self,
6565
classname: str,
66-
timeout_func: Callable[[IN, ResultFuture[OUT]], None],
66+
timeout_func: Callable[[IN], List[OUT]],
6767
exception_handler: Callable[[Exception], None],
6868
record: IN,
6969
result_future: ResultFuture[OUT]):
@@ -115,7 +115,11 @@ def _process_results(self, result: List[OUT]):
115115

116116
def _timer_triggered(self):
117117
if not self._completed.get():
118-
self._timeout_func(self._record, self)
118+
try:
119+
result = self._timeout_func(self._record)
120+
self._result_future.complete(result)
121+
except Exception as e:
122+
self._result_future.complete_exceptionally(e)
119123

120124

121125
class RetryableResultHandler(ResultFuture, Generic[IN, OUT]):
@@ -192,7 +196,11 @@ def _timer_triggered(self):
192196
# force reset _retry_awaiting to prevent the handler to trigger retry unnecessarily
193197
self._retry_awaiting.set(False)
194198

195-
self._result_handler._timeout_func(self._result_handler._record, self)
199+
try:
200+
result = self._result_handler._timeout_func(self._result_handler._record)
201+
self._result_handler._result_future.complete(result)
202+
except Exception as e:
203+
self._result_handler._result_future.complete_exceptionally(e)
196204

197205

198206
class Emitter(threading.Thread):
@@ -223,9 +231,8 @@ def stop(self):
223231

224232

225233
class AsyncFunctionRunner(threading.Thread):
226-
def __init__(self, exception_handler: Callable[[Exception], None]):
234+
def __init__(self):
227235
super().__init__()
228-
self._exception_handler = exception_handler
229236
self._loop = None
230237
self._ready = threading.Event()
231238

@@ -251,14 +258,15 @@ def stop(self):
251258
self._loop.call_soon_threadsafe(self._loop.stop)
252259
self.join(timeout=1.0)
253260

254-
async def exception_handler_wrapper(self, async_function, *arg):
261+
async def exception_handler_wrapper(self, async_function, record, result_handler):
255262
try:
256-
await async_function(*arg)
263+
result = await async_function(record)
264+
result_handler.complete(result)
257265
except Exception as e:
258-
self._exception_handler(e)
266+
result_handler.complete_exceptionally(e)
259267

260-
def run_async(self, async_function, *arg):
261-
wrapped_function = self.exception_handler_wrapper(async_function, *arg)
268+
def run_async(self, async_function, record, result_handler):
269+
wrapped_function = self.exception_handler_wrapper(async_function, record, result_handler)
262270
asyncio.run_coroutine_threadsafe(wrapped_function, self._loop)
263271

264272

@@ -306,7 +314,7 @@ def open(self):
306314
self._emitter.daemon = True
307315
self._emitter.start()
308316

309-
self._async_function_runner = AsyncFunctionRunner(self._mark_exception)
317+
self._async_function_runner = AsyncFunctionRunner()
310318
self._async_function_runner.daemon = True
311319
self._async_function_runner.start()
312320
self._async_function_runner.wait_ready()

0 commit comments

Comments
 (0)