Skip to content

Commit

Permalink
Don't obfuscate fine performance metrics failures (dask#8568)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored and milesgranger committed Mar 13, 2024
1 parent 28fd828 commit 119c7d8
Showing 1 changed file with 69 additions and 69 deletions.
138 changes: 69 additions & 69 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2975,49 +2975,50 @@ def apply_function_simple(
-------
msg: dictionary with status, result/error, timings, etc..
"""
ident = threading.get_ident()
try:
# meter("thread-cpu").delta
# difference in thread_time() before and after function call, minus user calls
# to context_meter inside the function. Published to Server.digests as
# {("execute", <prefix>, "thread-cpu", "seconds"): <value>}
# m.delta
# difference in wall time before and after function call, minus thread-cpu,
# minus user calls to context_meter. Published to Server.digests as
# {("execute", <prefix>, "thread-noncpu", "seconds"): <value>}
# m.stop - m.start
# difference in wall time before and after function call, without subtracting
# anything. This is used in scheduler heuristics, e.g. task stealing.
with context_meter.meter("thread-noncpu", func=time) as m:
with context_meter.meter("thread-cpu", func=thread_time):
result = function(*args, **kwargs)
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will pass
# through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught
# by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
else:
msg = {
"op": "task-finished",
"status": "OK",
"result": result,
"nbytes": sizeof(result),
"type": type(result) if result is not None else None,
}
# meter("thread-cpu").delta
# Difference in thread_time() before and after function call, minus user calls
# to context_meter inside the function. Published to Server.digests as
# {("execute", <prefix>, "thread-cpu", "seconds"): <value>}
# m.delta
# Difference in wall time before and after function call, minus thread-cpu,
# minus user calls to context_meter. Published to Server.digests as
# {("execute", <prefix>, "thread-noncpu", "seconds"): <value>}
# m.stop - m.start
# Difference in wall time before and after function call, without subtracting
# anything. This is used in scheduler heuristics, e.g. task stealing.
with (
context_meter.meter("thread-noncpu", func=time) as m,
context_meter.meter("thread-cpu", func=thread_time),
):
try:
result = function(*args, **kwargs)
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will
# pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
# be caught by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
else:
msg = {
"op": "task-finished",
"status": "OK",
"result": result,
"nbytes": sizeof(result),
"type": type(result) if result is not None else None,
}

msg["start"] = m.start + time_delay
msg["stop"] = m.stop + time_delay
msg["thread"] = ident
msg["thread"] = threading.get_ident()
return msg


Expand All @@ -3033,39 +3034,38 @@ async def apply_function_async(
-------
msg: dictionary with status, result/error, timings, etc..
"""
ident = threading.get_ident()
try:
with context_meter.meter("thread-noncpu", func=time) as m:
with context_meter.meter("thread-noncpu", func=time) as m:
try:
result = await function(*args, **kwargs)
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will pass
# through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught
# by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a
# reason to shut down the worker.
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
else:
msg = {
"op": "task-finished",
"status": "OK",
"result": result,
"nbytes": sizeof(result),
"type": type(result) if result is not None else None,
}
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will
# pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
# be caught by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# NOTE: this includes `CancelledError`! Since it's a user task, that's _not_
# a reason to shut down the worker.
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
else:
msg = {
"op": "task-finished",
"status": "OK",
"result": result,
"nbytes": sizeof(result),
"type": type(result) if result is not None else None,
}

msg["start"] = m.start + time_delay
msg["stop"] = m.stop + time_delay
msg["thread"] = ident
msg["thread"] = threading.get_ident()
return msg


Expand Down

0 comments on commit 119c7d8

Please sign in to comment.