Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Lib/profiling/sampling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
call stack rather than tracing every function call.
"""

from .collector import Collector
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
from .gecko_collector import GeckoCollector
from .string_table import StringTable
# Profiling requires the _remote_debugging C extension.
try:
import _remote_debugging # noqa: F401
except ImportError:
__all__ = ()
else:
from .collector import Collector
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
from .gecko_collector import GeckoCollector
from .string_table import StringTable

__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable")
__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable")
94 changes: 94 additions & 0 deletions Lib/profiling/sampling/collector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from abc import ABC, abstractmethod
from collections import deque

from _remote_debugging import FrameInfo


# Enums are slow
THREAD_STATE_RUNNING = 0
Expand Down Expand Up @@ -31,3 +35,93 @@ def _iter_all_frames(self, stack_frames, skip_idle=False):
frames = thread_info.frame_info
if frames:
yield frames, thread_info.thread_id

def _iter_async_frames(self, awaited_info_list):
# Phase 1: Index tasks and build parent relationships
task_map, child_to_parents, all_task_ids = self._build_task_graph(awaited_info_list)

# Phase 2: Find leaf tasks (tasks not awaited by anyone)
leaf_task_ids = self._find_leaf_tasks(child_to_parents, all_task_ids)

# Phase 3: Build linear stacks via BFS from each leaf to root
yield from self._build_linear_stacks(leaf_task_ids, task_map, child_to_parents)

def _build_task_graph(self, awaited_info_list):
task_map = {}
child_to_parents = {}
all_task_ids = set()

for awaited_info in awaited_info_list:
thread_id = awaited_info.thread_id
for task_info in awaited_info.awaited_by:
task_id = task_info.task_id
task_map[task_id] = (task_info, thread_id)
all_task_ids.add(task_id)

# Store parent task IDs (not frames - those are in task_info.coroutine_stack)
if task_info.awaited_by:
child_to_parents[task_id] = [p.task_name for p in task_info.awaited_by]

return task_map, child_to_parents, all_task_ids

def _find_leaf_tasks(self, child_to_parents, all_task_ids):
all_parent_ids = set()
for parent_ids in child_to_parents.values():
all_parent_ids.update(parent_ids)
return all_task_ids - all_parent_ids

def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parents):
for leaf_id in leaf_task_ids:
# Track yielded paths to avoid duplicates from multiple parent paths
yielded_paths = set()

# BFS queue: (current_task_id, frames_so_far, path_for_cycle_detection, thread_id)
# Use deque for O(1) popleft instead of O(n) list.pop(0)
queue = deque([(leaf_id, [], frozenset(), None)])

while queue:
current_id, frames, path, thread_id = queue.popleft()

# Cycle detection
if current_id in path:
continue

# End of path (parent ID not in task_map)
if current_id not in task_map:
if frames:
yield frames, thread_id, leaf_id
continue

# Process current task
task_info, tid = task_map[current_id]

# Set thread_id from first task if not already set
if thread_id is None:
thread_id = tid

new_frames = list(frames)
new_path = path | {current_id}

# Add all frames from all coroutines in this task
if task_info.coroutine_stack:
for coro_info in task_info.coroutine_stack:
for frame in coro_info.call_stack:
new_frames.append(frame)

# Add task boundary marker
task_name = task_info.task_name or "Task-" + str(task_info.task_id)
new_frames.append(FrameInfo(("<task>", 0, task_name)))

# Get parent task IDs
parent_ids = child_to_parents.get(current_id, [])

if not parent_ids:
# Root task - yield complete stack (deduplicate)
path_sig = frozenset(new_path)
if path_sig not in yielded_paths:
yielded_paths.add(path_sig)
yield new_frames, thread_id, leaf_id
else:
# Continue to each parent (creates multiple paths if >1 parent)
for parent_id in parent_ids:
queue.append((parent_id, new_frames, new_path, thread_id))
10 changes: 8 additions & 2 deletions Lib/profiling/sampling/pstats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ def _process_frames(self, frames):
self.callers[callee][caller] += 1

def collect(self, stack_frames):
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
# Async frame processing
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
self._process_frames(frames)
else:
# Regular frame processing
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)

def export(self, filename):
self.create_stats()
Expand Down
19 changes: 16 additions & 3 deletions Lib/profiling/sampling/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MOD
self.total_samples = 0
self.realtime_stats = False

def sample(self, collector, duration_sec=10):
def sample(self, collector, duration_sec=10, *, async_aware=False):
sample_interval_sec = self.sample_interval_usec / 1_000_000
running_time = 0
num_samples = 0
Expand All @@ -168,7 +168,10 @@ def sample(self, collector, duration_sec=10):
current_time = time.perf_counter()
if next_time < current_time:
try:
stack_frames = self.unwinder.get_stack_trace()
if async_aware:
stack_frames = self.unwinder.get_all_awaited_by()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
duration_sec = current_time - start_time
Expand Down Expand Up @@ -613,6 +616,7 @@ def sample(
output_format="pstats",
realtime_stats=False,
mode=PROFILING_MODE_WALL,
async_aware=False,
):
profiler = SampleProfiler(
pid, sample_interval_usec, all_threads=all_threads, mode=mode
Expand All @@ -638,7 +642,7 @@ def sample(
case _:
raise ValueError(f"Invalid output format: {output_format}")

profiler.sample(collector, duration_sec)
profiler.sample(collector, duration_sec, async_aware=async_aware)

if output_format == "pstats" and not filename:
stats = pstats.SampledStats(collector).strip_dirs()
Expand Down Expand Up @@ -706,6 +710,7 @@ def wait_for_process_and_sample(pid, sort_value, args):
output_format=args.format,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
)


Expand Down Expand Up @@ -760,6 +765,13 @@ def main():
help="Print real-time sampling statistics (Hz, mean, min, max, stdev) during profiling",
)

sampling_group.add_argument(
"--async-aware",
action="store_true",
default=False,
help="Enable async-aware sampling (experimental)",
)

# Mode options
mode_group = parser.add_argument_group("Mode options")
mode_group.add_argument(
Expand Down Expand Up @@ -915,6 +927,7 @@ def main():
output_format=args.format,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
)
elif args.module or args.args:
if args.module:
Expand Down
16 changes: 12 additions & 4 deletions Lib/profiling/sampling/stack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ def __init__(self, *, skip_idle=False):
self.skip_idle = skip_idle

def collect(self, stack_frames, skip_idle=False):
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle):
if not frames:
continue
self.process_frames(frames, thread_id)
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
# Async-aware mode: process async task frames
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
if not frames:
continue
self.process_frames(frames, thread_id)
else:
# Sync-only mode
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle):
if not frames:
continue
self.process_frames(frames, thread_id)

def process_frames(self, frames, thread_id):
pass
Expand Down
Loading
Loading