diff --git a/Lib/profiling/sampling/__init__.py b/Lib/profiling/sampling/__init__.py index b493c6aa7eb06d..2c7ca4983488d0 100644 --- a/Lib/profiling/sampling/__init__.py +++ b/Lib/profiling/sampling/__init__.py @@ -4,10 +4,16 @@ call stack rather than tracing every function call. """ -from .collector import Collector -from .pstats_collector import PstatsCollector -from .stack_collector import CollapsedStackCollector -from .gecko_collector import GeckoCollector -from .string_table import StringTable +# Profiling requires the _remote_debugging C extension. +try: + import _remote_debugging # noqa: F401 +except ImportError: + __all__ = () +else: + from .collector import Collector + from .pstats_collector import PstatsCollector + from .stack_collector import CollapsedStackCollector + from .gecko_collector import GeckoCollector + from .string_table import StringTable -__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable") + __all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable") diff --git a/Lib/profiling/sampling/collector.py b/Lib/profiling/sampling/collector.py index b7a033ac0a6637..e4029c5e270468 100644 --- a/Lib/profiling/sampling/collector.py +++ b/Lib/profiling/sampling/collector.py @@ -1,4 +1,8 @@ from abc import ABC, abstractmethod +from collections import deque + +from _remote_debugging import FrameInfo + # Enums are slow THREAD_STATE_RUNNING = 0 @@ -31,3 +35,93 @@ def _iter_all_frames(self, stack_frames, skip_idle=False): frames = thread_info.frame_info if frames: yield frames, thread_info.thread_id + + def _iter_async_frames(self, awaited_info_list): + # Phase 1: Index tasks and build parent relationships + task_map, child_to_parents, all_task_ids = self._build_task_graph(awaited_info_list) + + # Phase 2: Find leaf tasks (tasks not awaited by anyone) + leaf_task_ids = self._find_leaf_tasks(child_to_parents, all_task_ids) + + # Phase 3: Build linear stacks via BFS from each leaf to root + yield from self._build_linear_stacks(leaf_task_ids, task_map, child_to_parents) + + def _build_task_graph(self, awaited_info_list): + task_map = {} + child_to_parents = {} + all_task_ids = set() + + for awaited_info in awaited_info_list: + thread_id = awaited_info.thread_id + for task_info in awaited_info.awaited_by: + task_id = task_info.task_id + task_map[task_id] = (task_info, thread_id) + all_task_ids.add(task_id) + + # Store parent task IDs (not frames - those are in task_info.coroutine_stack) + if task_info.awaited_by: + child_to_parents[task_id] = [p.task_name for p in task_info.awaited_by] + + return task_map, child_to_parents, all_task_ids + + def _find_leaf_tasks(self, child_to_parents, all_task_ids): + all_parent_ids = set() + for parent_ids in child_to_parents.values(): + all_parent_ids.update(parent_ids) + return all_task_ids - all_parent_ids + + def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parents): + for leaf_id in leaf_task_ids: + # Track yielded paths to avoid duplicates from multiple parent paths + yielded_paths = set() + + # BFS queue: (current_task_id, frames_so_far, path_for_cycle_detection, thread_id) + # Use deque for O(1) popleft instead of O(n) list.pop(0) + queue = deque([(leaf_id, [], frozenset(), None)]) + + while queue: + current_id, frames, path, thread_id = queue.popleft() + + # Cycle detection + if current_id in path: + continue + + # End of path (parent ID not in task_map) + if current_id not in task_map: + if frames: + yield frames, thread_id, leaf_id + continue + + # Process current task + task_info, tid = task_map[current_id] + + # Set thread_id from first task if not already set + if thread_id is None: + thread_id = tid + + new_frames = list(frames) + new_path = path | {current_id} + + # Add all frames from all coroutines in this task + if task_info.coroutine_stack: + for coro_info in task_info.coroutine_stack: + for frame in coro_info.call_stack: + new_frames.append(frame) + + # Add task boundary marker + task_name = task_info.task_name or "Task-" + str(task_info.task_id) + new_frames.append(FrameInfo(("", 0, task_name))) + + # Get parent task IDs + parent_ids = child_to_parents.get(current_id, []) + + if not parent_ids: + # Root task - yield complete stack (deduplicate) + path_sig = frozenset(new_path) + if path_sig not in yielded_paths: + yielded_paths.add(path_sig) + yield new_frames, thread_id, leaf_id + else: + # Continue to each parent (creates multiple paths if >1 parent) + for parent_id in parent_ids: + queue.append((parent_id, new_frames, new_path, thread_id)) diff --git a/Lib/profiling/sampling/pstats_collector.py b/Lib/profiling/sampling/pstats_collector.py index e06dbf40aa1d89..16bb6a4a155ce1 100644 --- a/Lib/profiling/sampling/pstats_collector.py +++ b/Lib/profiling/sampling/pstats_collector.py @@ -41,8 +41,14 @@ def _process_frames(self, frames): self.callers[callee][caller] += 1 def collect(self, stack_frames): - for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle): - self._process_frames(frames) + if stack_frames and hasattr(stack_frames[0], "awaited_by"): + # Async frame processing + for frames, thread_id, task_id in self._iter_async_frames(stack_frames): + self._process_frames(frames) + else: + # Regular frame processing + for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle): + self._process_frames(frames) def export(self, filename): self.create_stats() diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index 7a0f739a5428c6..16e4b915a4e7c6 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -154,7 +154,7 @@ def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MOD self.total_samples = 0 self.realtime_stats = False - def sample(self, collector, duration_sec=10): + def sample(self, collector, duration_sec=10, *, async_aware=False): sample_interval_sec = self.sample_interval_usec / 1_000_000 running_time = 0 num_samples = 0 @@ -168,7 +168,10 @@ def sample(self, collector, duration_sec=10): current_time = time.perf_counter() if next_time < current_time: try: - stack_frames = self.unwinder.get_stack_trace() + if async_aware: + stack_frames = self.unwinder.get_all_awaited_by() + else: + stack_frames = self.unwinder.get_stack_trace() collector.collect(stack_frames) except ProcessLookupError: duration_sec = current_time - start_time @@ -613,6 +616,7 @@ def sample( output_format="pstats", realtime_stats=False, mode=PROFILING_MODE_WALL, + async_aware=False, ): profiler = SampleProfiler( pid, sample_interval_usec, all_threads=all_threads, mode=mode @@ -638,7 +642,7 @@ def sample( case _: raise ValueError(f"Invalid output format: {output_format}") - profiler.sample(collector, duration_sec) + profiler.sample(collector, duration_sec, async_aware=async_aware) if output_format == "pstats" and not filename: stats = pstats.SampledStats(collector).strip_dirs() @@ -706,6 +710,7 @@ def wait_for_process_and_sample(pid, sort_value, args): output_format=args.format, realtime_stats=args.realtime_stats, mode=mode, + async_aware=args.async_aware, ) @@ -760,6 +765,13 @@ def main(): help="Print real-time sampling statistics (Hz, mean, min, max, stdev) during profiling", ) + sampling_group.add_argument( + "--async-aware", + action="store_true", + default=False, + help="Enable async-aware sampling (experimental)", + ) + # Mode options mode_group = parser.add_argument_group("Mode options") mode_group.add_argument( @@ -915,6 +927,7 @@ def main(): output_format=args.format, realtime_stats=args.realtime_stats, mode=mode, + async_aware=args.async_aware, ) elif args.module or args.args: if args.module: diff --git a/Lib/profiling/sampling/stack_collector.py b/Lib/profiling/sampling/stack_collector.py index bc38151e067989..47148329ebdf44 100644 --- a/Lib/profiling/sampling/stack_collector.py +++ b/Lib/profiling/sampling/stack_collector.py @@ -15,10 +15,18 @@ def __init__(self, *, skip_idle=False): self.skip_idle = skip_idle def collect(self, stack_frames, skip_idle=False): - for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle): - if not frames: - continue - self.process_frames(frames, thread_id) + if stack_frames and hasattr(stack_frames[0], "awaited_by"): + # Async-aware mode: process async task frames + for frames, thread_id, task_id in self._iter_async_frames(stack_frames): + if not frames: + continue + self.process_frames(frames, thread_id) + else: + # Sync-only mode + for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle): + if not frames: + continue + self.process_frames(frames, thread_id) def process_frames(self, frames, thread_id): pass diff --git a/Lib/test/test_profiling/test_sampling_profiler.py b/Lib/test/test_profiling/test_sampling_profiler.py index 5b924cb24531b6..9d0628ed2a2e19 100644 --- a/Lib/test/test_profiling/test_sampling_profiler.py +++ b/Lib/test/test_profiling/test_sampling_profiler.py @@ -14,13 +14,6 @@ from collections import namedtuple from unittest import mock -from profiling.sampling.pstats_collector import PstatsCollector -from profiling.sampling.stack_collector import ( - CollapsedStackCollector, - FlamegraphCollector, -) -from profiling.sampling.gecko_collector import GeckoCollector - from test.support.os_helper import unlink from test.support import force_not_colorized_test_class, SHORT_TIMEOUT from test.support.socket_helper import find_unused_port @@ -38,6 +31,12 @@ ) else: import profiling.sampling + from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.stack_collector import ( + CollapsedStackCollector, + FlamegraphCollector, + ) + from profiling.sampling.gecko_collector import GeckoCollector from profiling.sampling.sample import SampleProfiler @@ -76,6 +75,29 @@ def __repr__(self): return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})" +class MockCoroInfo: + """Mock CoroInfo for testing async tasks.""" + def __init__(self, task_name, call_stack): + self.task_name = task_name + self.call_stack = call_stack + + +class MockTaskInfo: + """Mock TaskInfo for testing async tasks.""" + def __init__(self, task_id, task_name, coroutine_stack, awaited_by=None): + self.task_id = task_id + self.task_name = task_name + self.coroutine_stack = coroutine_stack + self.awaited_by = awaited_by or [] + + +class MockAwaitedInfo: + """Mock AwaitedInfo for testing async tasks.""" + def __init__(self, thread_id, awaited_by): + self.thread_id = thread_id + self.awaited_by = awaited_by + + skip_if_not_supported = unittest.skipIf( ( sys.platform != "darwin" @@ -741,6 +763,293 @@ def test_pstats_collector_export(self): self.assertEqual(func1_stats[3], 2.0) # ct (cumulative time) +class TestAsyncStackReconstruction(unittest.TestCase): + """Test async task tree linear stack reconstruction.""" + + def _build_task_tree(self, task_specs): + """ + Helper to build task tree from simple specifications. + + Args: + task_specs: List of (task_id, func_name, line, parent_ids) tuples + parent_ids can be a string or list of strings + + Returns: + List of MockTaskInfo objects + """ + tasks = {} + + # First pass: create all tasks + for task_id, func_name, line, parent_ids in task_specs: + tasks[task_id] = MockTaskInfo( + task_id=task_id, + task_name=task_id, + coroutine_stack=[MockCoroInfo(task_id, [MockFrameInfo(f"{func_name}.py", line, func_name)])], + awaited_by=[] + ) + + # Second pass: link parents + for task_id, func_name, line, parent_ids in task_specs: + if parent_ids: + if isinstance(parent_ids, str): + parent_ids = [parent_ids] + for parent_id in parent_ids: + tasks[task_id].awaited_by.append( + MockTaskInfo(parent_id, parent_id, [], []) + ) + + return list(tasks.values()) + + def test_single_parent_chain(self): + """Test async stack reconstruction with normal single-parent chains.""" + collector = PstatsCollector(sample_interval_usec=1000) + + # Build task tree: + # Task-1 -> Worker-A -> LeafA-1 + # Task-1 -> Worker-B -> LeafB-1 + tasks = self._build_task_tree([ + ("LeafA-1", "leaf_work", 10, "Worker-A"), + ("LeafB-1", "leaf_work", 10, "Worker-B"), + ("Worker-A", "worker_a", 20, "Task-1"), + ("Worker-B", "worker_b", 25, "Task-1"), + ("Task-1", "main", 30, None), + ]) + + awaited_info_list = [MockAwaitedInfo(thread_id=123, awaited_by=tasks)] + + stacks = list(collector._iter_async_frames(awaited_info_list)) + + # Should get 2 stacks (one for LeafA-1, one for LeafB-1) + self.assertEqual(len(stacks), 2) + + for frames, thread_id, leaf_id in stacks: + self.assertEqual(thread_id, 123) + self.assertIn(leaf_id, ["LeafA-1", "LeafB-1"]) + + frame_names = [f.funcname for f in frames] + self.assertIn("leaf_work", frame_names) + self.assertIn("main", frame_names) + + # Check task markers + task_markers = [f.funcname for f in frames if f.filename == ""] + self.assertEqual(len(task_markers), 3) # LeafX, Worker-X, Task-1 + + def test_multiple_parents_diamond(self): + """Test async stack reconstruction with diamond dependency (multiple parents).""" + collector = PstatsCollector(sample_interval_usec=1000) + + # Build task tree: + # Task-1 + # / \ + # Parent-1 Parent-2 + # \ / + # SharedChild + tasks = self._build_task_tree([ + ("SharedChild", "shared_child", 10, ["Parent-1", "Parent-2"]), + ("Parent-1", "parent_1", 20, "Task-1"), + ("Parent-2", "parent_2", 30, "Task-1"), + ("Task-1", "main", 40, None), + ]) + + awaited_info_list = [MockAwaitedInfo(thread_id=456, awaited_by=tasks)] + + stacks = list(collector._iter_async_frames(awaited_info_list)) + + # Should get 2 stacks (one for each path from SharedChild to Task-1) + self.assertEqual(len(stacks), 2) + + # Both stacks should have SharedChild as the leaf + for frames, thread_id, leaf_id in stacks: + self.assertEqual(thread_id, 456) + self.assertEqual(leaf_id, "SharedChild") + + frame_names = [f.funcname for f in frames] + self.assertIn("shared_child", frame_names) + self.assertIn("main", frame_names) + + # Each stack should have EITHER parent_1 OR parent_2 (not both) + has_parent_1 = "parent_1" in frame_names + has_parent_2 = "parent_2" in frame_names + self.assertTrue(has_parent_1 or has_parent_2) + self.assertFalse(has_parent_1 and has_parent_2) + + # Verify we got one path through each parent + paths = [] + for frames, _, _ in stacks: + frame_names = [f.funcname for f in frames] + if "parent_1" in frame_names: + paths.append("Parent-1") + elif "parent_2" in frame_names: + paths.append("Parent-2") + + self.assertEqual(sorted(paths), ["Parent-1", "Parent-2"]) + + def test_multiple_parents_three_levels(self): + """Test async stack reconstruction with complex 3-level diamond dependencies.""" + collector = PstatsCollector(sample_interval_usec=1000) + + # Build complex task tree with multiple diamonds: + # Root + # / \ + # Mid-A Mid-B + # / \ / \ + # Worker-1 Worker-2 Worker-3 + # \ | / + # LeafTask + tasks = self._build_task_tree([ + ("LeafTask", "leaf_work", 10, ["Worker-1", "Worker-2", "Worker-3"]), + ("Worker-1", "worker_1", 20, "Mid-A"), + ("Worker-2", "worker_2", 30, ["Mid-A", "Mid-B"]), + ("Worker-3", "worker_3", 40, "Mid-B"), + ("Mid-A", "mid_a", 50, "Root"), + ("Mid-B", "mid_b", 60, "Root"), + ("Root", "main", 70, None), + ]) + + awaited_info_list = [MockAwaitedInfo(thread_id=789, awaited_by=tasks)] + + stacks = list(collector._iter_async_frames(awaited_info_list)) + + # Should get 4 stacks: + # Path 1: LeafTask -> Worker-1 -> Mid-A -> Root + # Path 2: LeafTask -> Worker-2 -> Mid-A -> Root + # Path 3: LeafTask -> Worker-2 -> Mid-B -> Root + # Path 4: LeafTask -> Worker-3 -> Mid-B -> Root + self.assertEqual(len(stacks), 4) + + # All stacks should have LeafTask as the leaf + for frames, thread_id, leaf_id in stacks: + self.assertEqual(thread_id, 789) + self.assertEqual(leaf_id, "LeafTask") + + frame_names = [f.funcname for f in frames] + self.assertIn("leaf_work", frame_names) + self.assertIn("main", frame_names) + + # Verify we got all 4 unique paths + paths = [] + for frames, _, _ in stacks: + frame_names = [f.funcname for f in frames] + + # Determine which worker + worker = None + if "worker_1" in frame_names: + worker = "Worker-1" + elif "worker_2" in frame_names: + worker = "Worker-2" + elif "worker_3" in frame_names: + worker = "Worker-3" + + # Determine which mid + mid = None + if "mid_a" in frame_names: + mid = "Mid-A" + elif "mid_b" in frame_names: + mid = "Mid-B" + + if worker and mid: + paths.append((worker, mid)) + + # Should have 4 distinct paths + self.assertEqual(len(paths), 4) + expected_paths = [ + ("Worker-1", "Mid-A"), + ("Worker-2", "Mid-A"), + ("Worker-2", "Mid-B"), + ("Worker-3", "Mid-B") + ] + self.assertEqual(sorted(paths), sorted(expected_paths)) + + def test_multiple_leaves_shared_ancestors(self): + """Test async stack reconstruction with multiple leaves sharing ancestors across 4 levels.""" + collector = PstatsCollector(sample_interval_usec=1000) + + # Build complex task tree with shared ancestors: + # Root + # / \ + # Coord-A Coord-B + # / \ / \ + # Worker-1 Worker-2 Worker-3 + # / \ | / \ + # Leaf-1 Leaf-2 Leaf-3 Leaf-4 Leaf-5 + tasks = self._build_task_tree([ + ("Leaf-1", "leaf_1", 10, "Worker-1"), + ("Leaf-2", "leaf_2", 20, "Worker-1"), + ("Leaf-3", "leaf_3", 30, "Worker-2"), + ("Leaf-4", "leaf_4", 40, "Worker-3"), + ("Leaf-5", "leaf_5", 50, "Worker-3"), + ("Worker-1", "worker_1", 100, "Coord-A"), + ("Worker-2", "worker_2", 200, ["Coord-A", "Coord-B"]), + ("Worker-3", "worker_3", 300, "Coord-B"), + ("Coord-A", "coord_a", 400, "Root"), + ("Coord-B", "coord_b", 500, "Root"), + ("Root", "main", 600, None), + ]) + + awaited_info_list = [MockAwaitedInfo(thread_id=999, awaited_by=tasks)] + + stacks = list(collector._iter_async_frames(awaited_info_list)) + + # Expected paths: + # Leaf-1 -> Worker-1 -> Coord-A -> Root + # Leaf-2 -> Worker-1 -> Coord-A -> Root + # Leaf-3 -> Worker-2 -> Coord-A -> Root + # Leaf-3 -> Worker-2 -> Coord-B -> Root (Worker-2 has 2 parents!) + # Leaf-4 -> Worker-3 -> Coord-B -> Root + # Leaf-5 -> Worker-3 -> Coord-B -> Root + # Total: 6 stacks + self.assertEqual(len(stacks), 6) + + # Verify all thread IDs are correct + for frames, thread_id, leaf_id in stacks: + self.assertEqual(thread_id, 999) + self.assertIn(leaf_id, ["Leaf-1", "Leaf-2", "Leaf-3", "Leaf-4", "Leaf-5"]) + + # Collect all (leaf, worker, coord) tuples + paths = [] + for frames, _, leaf_id in stacks: + frame_names = [f.funcname for f in frames] + + # All paths should have main at the root + self.assertIn("main", frame_names) + + # Determine worker + worker = None + if "worker_1" in frame_names: + worker = "Worker-1" + elif "worker_2" in frame_names: + worker = "Worker-2" + elif "worker_3" in frame_names: + worker = "Worker-3" + + # Determine coordinator + coord = None + if "coord_a" in frame_names: + coord = "Coord-A" + elif "coord_b" in frame_names: + coord = "Coord-B" + + if worker and coord: + paths.append((leaf_id, worker, coord)) + + # Verify all 6 expected paths + expected_paths = [ + ("Leaf-1", "Worker-1", "Coord-A"), + ("Leaf-2", "Worker-1", "Coord-A"), + ("Leaf-3", "Worker-2", "Coord-A"), + ("Leaf-3", "Worker-2", "Coord-B"), # Leaf-3 appears twice due to Worker-2 diamond + ("Leaf-4", "Worker-3", "Coord-B"), + ("Leaf-5", "Worker-3", "Coord-B"), + ] + self.assertEqual(sorted(paths), sorted(expected_paths)) + + # Verify Leaf-3 has 2 different paths (through both coordinators) + leaf_3_paths = [p for p in paths if p[0] == "Leaf-3"] + self.assertEqual(len(leaf_3_paths), 2) + leaf_3_coords = {p[2] for p in leaf_3_paths} + self.assertEqual(leaf_3_coords, {"Coord-A", "Coord-B"}) + + class TestSampleProfiler(unittest.TestCase): """Test the SampleProfiler class.""" @@ -2165,7 +2474,8 @@ def test_cli_module_argument_parsing(self): show_summary=True, output_format="pstats", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") @@ -2193,7 +2503,8 @@ def test_cli_module_with_arguments(self): show_summary=True, output_format="pstats", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") @@ -2221,7 +2532,8 @@ def test_cli_script_argument_parsing(self): show_summary=True, output_format="pstats", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") @@ -2321,7 +2633,8 @@ def test_cli_module_with_profiler_options(self): show_summary=True, output_format="pstats", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") @@ -2355,7 +2668,8 @@ def test_cli_script_with_profiler_options(self): show_summary=True, output_format="collapsed", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) def test_cli_empty_module_name(self): @@ -2567,7 +2881,8 @@ def test_argument_parsing_basic(self): show_summary=True, output_format="pstats", realtime_stats=False, - mode=0 + mode=0, + async_aware=False ) def test_sort_options(self): diff --git a/Misc/NEWS.d/next/Library/2025-11-14-18-00-41.gh-issue-141565.Ap2bhJ.rst b/Misc/NEWS.d/next/Library/2025-11-14-18-00-41.gh-issue-141565.Ap2bhJ.rst new file mode 100644 index 00000000000000..628f1e0af033c7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-11-14-18-00-41.gh-issue-141565.Ap2bhJ.rst @@ -0,0 +1 @@ +Add async-aware profiling to the Tachyon sampling profiler. The profiler now reconstructs and displays async task hierarchies in flamegraphs, making the output more actionable for users. Patch by Savannah Ostrowski and Pablo Galindo Salgado.