diff --git a/distributed/worker.py b/distributed/worker.py index 396ae203925..cd7f60efea1 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2975,49 +2975,50 @@ def apply_function_simple( ------- msg: dictionary with status, result/error, timings, etc.. """ - ident = threading.get_ident() - try: - # meter("thread-cpu").delta - # difference in thread_time() before and after function call, minus user calls - # to context_meter inside the function. Published to Server.digests as - # {("execute", , "thread-cpu", "seconds"): } - # m.delta - # difference in wall time before and after function call, minus thread-cpu, - # minus user calls to context_meter. Published to Server.digests as - # {("execute", , "thread-noncpu", "seconds"): } - # m.stop - m.start - # difference in wall time before and after function call, without subtracting - # anything. This is used in scheduler heuristics, e.g. task stealing. - with context_meter.meter("thread-noncpu", func=time) as m: - with context_meter.meter("thread-cpu", func=thread_time): - result = function(*args, **kwargs) - except (SystemExit, KeyboardInterrupt): - # Special-case these, just like asyncio does all over the place. They will pass - # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught - # by special-case logic in asyncio: - # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 - # Any other `BaseException` types would ultimately be ignored by asyncio if - # raised here, after messing up the worker state machine along their way. - raise - except BaseException as e: - # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they - # aren't a reason to shut down the whole system (since we allow the - # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) - msg = error_message(e) - msg["op"] = "task-erred" - msg["actual-exception"] = e - else: - msg = { - "op": "task-finished", - "status": "OK", - "result": result, - "nbytes": sizeof(result), - "type": type(result) if result is not None else None, - } + # meter("thread-cpu").delta + # Difference in thread_time() before and after function call, minus user calls + # to context_meter inside the function. Published to Server.digests as + # {("execute", , "thread-cpu", "seconds"): } + # m.delta + # Difference in wall time before and after function call, minus thread-cpu, + # minus user calls to context_meter. Published to Server.digests as + # {("execute", , "thread-noncpu", "seconds"): } + # m.stop - m.start + # Difference in wall time before and after function call, without subtracting + # anything. This is used in scheduler heuristics, e.g. task stealing. + with ( + context_meter.meter("thread-noncpu", func=time) as m, + context_meter.meter("thread-cpu", func=thread_time), + ): + try: + result = function(*args, **kwargs) + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will + # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually + # be caught by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. + raise + except BaseException as e: + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) + msg = error_message(e) + msg["op"] = "task-erred" + msg["actual-exception"] = e + else: + msg = { + "op": "task-finished", + "status": "OK", + "result": result, + "nbytes": sizeof(result), + "type": type(result) if result is not None else None, + } msg["start"] = m.start + time_delay msg["stop"] = m.stop + time_delay - msg["thread"] = ident + msg["thread"] = threading.get_ident() return msg @@ -3033,39 +3034,38 @@ async def apply_function_async( ------- msg: dictionary with status, result/error, timings, etc.. """ - ident = threading.get_ident() - try: - with context_meter.meter("thread-noncpu", func=time) as m: + with context_meter.meter("thread-noncpu", func=time) as m: + try: result = await function(*args, **kwargs) - except (SystemExit, KeyboardInterrupt): - # Special-case these, just like asyncio does all over the place. They will pass - # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught - # by special-case logic in asyncio: - # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 - # Any other `BaseException` types would ultimately be ignored by asyncio if - # raised here, after messing up the worker state machine along their way. - raise - except BaseException as e: - # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a - # reason to shut down the worker. - # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they - # aren't a reason to shut down the whole system (since we allow the - # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) - msg = error_message(e) - msg["op"] = "task-erred" - msg["actual-exception"] = e - else: - msg = { - "op": "task-finished", - "status": "OK", - "result": result, - "nbytes": sizeof(result), - "type": type(result) if result is not None else None, - } + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will + # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually + # be caught by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. + raise + except BaseException as e: + # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ + # a reason to shut down the worker. + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) + msg = error_message(e) + msg["op"] = "task-erred" + msg["actual-exception"] = e + else: + msg = { + "op": "task-finished", + "status": "OK", + "result": result, + "nbytes": sizeof(result), + "type": type(result) if result is not None else None, + } msg["start"] = m.start + time_delay msg["stop"] = m.stop + time_delay - msg["thread"] = ident + msg["thread"] = threading.get_ident() return msg