Skip to content

Commit

Permalink
[3.12] gh-128479: fix asyncio staggered race leaking tasks, and loggi…
Browse files Browse the repository at this point in the history
…ng unhandled exception.append exception (GH-128475) (#129228)

gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception (GH-128475)
(cherry picked from commit ec91e1c)

Co-authored-by: Thomas Grainger <[email protected]>
Co-authored-by: Peter Bierma <[email protected]>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent bb7c54d commit e94939c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 24 deletions.
72 changes: 48 additions & 24 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,27 @@ async def staggered_race(coro_fns, delay, *, loop=None):
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
unhandled_exceptions = []
exceptions = []
running_tasks = []
running_tasks = set()
on_completed_fut = None

def task_done(task):
running_tasks.discard(task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
and not running_tasks
):
on_completed_fut.set_result(None)

if task.cancelled():
return

exc = task.exception()
if exc is None:
return
unhandled_exceptions.append(exc)

async def run_one_coro(ok_to_start, previous_failed) -> None:
# in eager tasks this waits for the calling task to append this task
Expand All @@ -91,11 +110,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
running_tasks.append(next_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
# start.
next_ok_to_start.set()
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1
Expand All @@ -120,31 +139,36 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
# up as done() == True, cancelled() == False, exception() ==
# asyncio.CancelledError. This behavior is specified in
# https://bugs.python.org/issue30048
for i, t in enumerate(running_tasks):
if i != this_index:
current_task = tasks.current_task(loop)
for t in running_tasks:
if t is not current_task:
t.cancel()

ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.append(first_task)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
propagate_cancellation_error = None
try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
done_count = 0
while done_count != len(running_tasks):
done, _ = await tasks.wait(running_tasks)
done_count = len(done)
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
propagate_cancellation_error = None
# Make sure no tasks are left running if we leave this function
while running_tasks:
on_completed_fut = loop.create_future()
try:
await on_completed_fut
except exceptions_mod.CancelledError as ex:
propagate_cancellation_error = ex
for task in running_tasks:
task.cancel(*ex.args)
on_completed_fut = None
if __debug__ and unhandled_exceptions:
# If run_one_coro raises an unhandled exception, it's probably a
# programming error, and I want to see it.
if __debug__:
for d in done:
if d.done() and not d.cancelled() and d.exception():
raise d.exception()
raise ExceptionGroup("staggered race failed", unhandled_exceptions)
if propagate_cancellation_error is not None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions
# Make sure no tasks are left running if we leave this function
for t in running_tasks:
t.cancel()
del exceptions, propagate_cancellation_error, unhandled_exceptions
27 changes: 27 additions & 0 deletions Lib/test/test_asyncio/test_staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,30 @@ async def do_set():
self.assertIsNone(excs[0], None)
self.assertIsInstance(excs[1], asyncio.CancelledError)
self.assertIsInstance(excs[2], asyncio.CancelledError)


async def test_cancelled(self):
log = []
with self.assertRaises(TimeoutError):
async with asyncio.timeout(None) as cs_outer, asyncio.timeout(None) as cs_inner:
async def coro_fn():
cs_inner.reschedule(-1)
await asyncio.sleep(0)
try:
await asyncio.sleep(0)
except asyncio.CancelledError:
log.append("cancelled 1")

cs_outer.reschedule(-1)
await asyncio.sleep(0)
try:
await asyncio.sleep(0)
except asyncio.CancelledError:
log.append("cancelled 2")
try:
await staggered_race([coro_fn], delay=None)
except asyncio.CancelledError:
log.append("cancelled 3")
raise

self.assertListEqual(log, ["cancelled 1", "cancelled 2", "cancelled 3"])
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix :func:`!asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception.

0 comments on commit e94939c

Please sign in to comment.