Skip to content

Commit c9bc458

Browse files
authored
gh-91048: Add ability to list all pending asyncio tasks in a process remotely (#132807)
1 parent 926ff69 commit c9bc458

File tree

4 files changed

+398
-1
lines changed

4 files changed

+398
-1
lines changed

Lib/test/test_external_inspection.py

+122-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import textwrap
44
import importlib
55
import sys
6-
from test.support import os_helper, SHORT_TIMEOUT
6+
from test.support import os_helper, SHORT_TIMEOUT, busy_retry
77
from test.support.script_helper import make_script
88

99
import subprocess
@@ -14,6 +14,7 @@
1414
from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
1515
from _testexternalinspection import get_stack_trace
1616
from _testexternalinspection import get_async_stack_trace
17+
from _testexternalinspection import get_all_awaited_by
1718
except ImportError:
1819
raise unittest.SkipTest(
1920
"Test only runs when _testexternalinspection is available")
@@ -349,6 +350,126 @@ async def main():
349350
]
350351
self.assertEqual(stack_trace, expected_stack_trace)
351352

353+
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
354+
"Test only runs on Linux and MacOS")
355+
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
356+
"Test only runs on Linux with process_vm_readv support")
357+
def test_async_global_awaited_by(self):
358+
script = textwrap.dedent("""\
359+
import asyncio
360+
import os
361+
import random
362+
import sys
363+
from string import ascii_lowercase, digits
364+
from test.support import socket_helper, SHORT_TIMEOUT
365+
366+
HOST = '127.0.0.1'
367+
PORT = socket_helper.find_unused_port()
368+
connections = 0
369+
370+
class EchoServerProtocol(asyncio.Protocol):
371+
def connection_made(self, transport):
372+
global connections
373+
connections += 1
374+
self.transport = transport
375+
376+
def data_received(self, data):
377+
self.transport.write(data)
378+
self.transport.close()
379+
380+
async def echo_client(message):
381+
reader, writer = await asyncio.open_connection(HOST, PORT)
382+
writer.write(message.encode())
383+
await writer.drain()
384+
385+
data = await reader.read(100)
386+
assert message == data.decode()
387+
writer.close()
388+
await writer.wait_closed()
389+
await asyncio.sleep(SHORT_TIMEOUT)
390+
391+
async def echo_client_spam(server):
392+
async with asyncio.TaskGroup() as tg:
393+
while connections < 1000:
394+
msg = list(ascii_lowercase + digits)
395+
random.shuffle(msg)
396+
tg.create_task(echo_client("".join(msg)))
397+
await asyncio.sleep(0)
398+
# at least a 1000 tasks created
399+
fifo_path = sys.argv[1]
400+
with open(fifo_path, "w") as fifo:
401+
fifo.write("ready")
402+
# at this point all client tasks completed without assertion errors
403+
# let's wrap up the test
404+
server.close()
405+
await server.wait_closed()
406+
407+
async def main():
408+
loop = asyncio.get_running_loop()
409+
server = await loop.create_server(EchoServerProtocol, HOST, PORT)
410+
async with server:
411+
async with asyncio.TaskGroup() as tg:
412+
tg.create_task(server.serve_forever(), name="server task")
413+
tg.create_task(echo_client_spam(server), name="echo client spam")
414+
415+
asyncio.run(main())
416+
""")
417+
stack_trace = None
418+
with os_helper.temp_dir() as work_dir:
419+
script_dir = os.path.join(work_dir, "script_pkg")
420+
os.mkdir(script_dir)
421+
fifo = f"{work_dir}/the_fifo"
422+
os.mkfifo(fifo)
423+
script_name = _make_test_script(script_dir, 'script', script)
424+
try:
425+
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
426+
with open(fifo, "r") as fifo_file:
427+
response = fifo_file.read()
428+
self.assertEqual(response, "ready")
429+
for _ in busy_retry(SHORT_TIMEOUT):
430+
try:
431+
all_awaited_by = get_all_awaited_by(p.pid)
432+
except RuntimeError as re:
433+
# This call reads a linked list in another process with
434+
# no synchronization. That occasionally leads to invalid
435+
# reads. Here we avoid making the test flaky.
436+
msg = str(re)
437+
if msg.startswith("Task list appears corrupted"):
438+
continue
439+
elif msg.startswith("Invalid linked list structure reading remote memory"):
440+
continue
441+
elif msg.startswith("Unknown error reading memory"):
442+
continue
443+
elif msg.startswith("Unhandled frame owner"):
444+
continue
445+
raise # Unrecognized exception, safest not to ignore it
446+
else:
447+
break
448+
# expected: a list of two elements: 1 thread, 1 interp
449+
self.assertEqual(len(all_awaited_by), 2)
450+
# expected: a tuple with the thread ID and the awaited_by list
451+
self.assertEqual(len(all_awaited_by[0]), 2)
452+
# expected: no tasks in the fallback per-interp task list
453+
self.assertEqual(all_awaited_by[1], (0, []))
454+
entries = all_awaited_by[0][1]
455+
# expected: at least 1000 pending tasks
456+
self.assertGreaterEqual(len(entries), 1000)
457+
# the first three tasks stem from the code structure
458+
self.assertIn(('Task-1', []), entries)
459+
self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries)
460+
self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries)
461+
# the final task will have some random number, but it should for
462+
# sure be one of the echo client spam horde
463+
self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1])
464+
except PermissionError:
465+
self.skipTest(
466+
"Insufficient permissions to read the stack trace")
467+
finally:
468+
os.remove(fifo)
469+
p.kill()
470+
p.terminate()
471+
p.wait(timeout=SHORT_TIMEOUT)
472+
352473
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
353474
"Test only runs on Linux and MacOS")
354475
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Add ability to externally inspect all pending asyncio tasks, even if no task
2+
is currently entered on the event loop.

Modules/_asynciomodule.c

+12
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,17 @@ typedef struct _Py_AsyncioModuleDebugOffsets {
105105
uint64_t task_is_task;
106106
uint64_t task_awaited_by_is_set;
107107
uint64_t task_coro;
108+
uint64_t task_node;
108109
} asyncio_task_object;
110+
struct _asyncio_interpreter_state {
111+
uint64_t size;
112+
uint64_t asyncio_tasks_head;
113+
} asyncio_interpreter_state;
109114
struct _asyncio_thread_state {
110115
uint64_t size;
111116
uint64_t asyncio_running_loop;
112117
uint64_t asyncio_running_task;
118+
uint64_t asyncio_tasks_head;
113119
} asyncio_thread_state;
114120
} Py_AsyncioModuleDebugOffsets;
115121

@@ -121,11 +127,17 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets _AsyncioDebug)
121127
.task_is_task = offsetof(TaskObj, task_is_task),
122128
.task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set),
123129
.task_coro = offsetof(TaskObj, task_coro),
130+
.task_node = offsetof(TaskObj, task_node),
131+
},
132+
.asyncio_interpreter_state = {
133+
.size = sizeof(PyInterpreterState),
134+
.asyncio_tasks_head = offsetof(PyInterpreterState, asyncio_tasks_head),
124135
},
125136
.asyncio_thread_state = {
126137
.size = sizeof(_PyThreadStateImpl),
127138
.asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop),
128139
.asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task),
140+
.asyncio_tasks_head = offsetof(_PyThreadStateImpl, asyncio_tasks_head),
129141
}};
130142

131143
/* State of the _asyncio module */

0 commit comments

Comments
 (0)