Skip to content

Commit a7beb03

Browse files
authored
fix(executor): Fix the issue of cleaning up sessions (#186)
1 parent e59c69e commit a7beb03

File tree

3 files changed

+0
-108
lines changed

3 files changed

+0
-108
lines changed

executor/agents/agno/agno_agent.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ async def _async_execute(self) -> TaskStatus:
337337
# Checkpoint 1: Check cancellation before execution starts
338338
if self.task_state_manager.is_cancelled(self.task_id):
339339
logger.info(f"Task {self.task_id} cancelled before execution")
340-
await self._cleanup_resources()
341340
return TaskStatus.COMPLETED
342341

343342
progress = 65
@@ -379,7 +378,6 @@ async def _async_execute(self) -> TaskStatus:
379378
# Checkpoint 2: Check cancellation after team/agent creation
380379
if self.task_state_manager.is_cancelled(self.task_id):
381380
logger.info(f"Task {self.task_id} cancelled after team/agent creation")
382-
await self._cleanup_resources()
383381
return TaskStatus.COMPLETED
384382

385383
# Prepare prompt
@@ -701,7 +699,6 @@ async def _run_agent_streaming_async(self, prompt: str) -> TaskStatus:
701699
# Checkpoint: Check cancellation during streaming
702700
if self.task_state_manager.is_cancelled(self.task_id):
703701
logger.info(f"Task {self.task_id} cancelled during agent streaming")
704-
await self._cleanup_resources()
705702
return TaskStatus.COMPLETED
706703

707704
result_content = await self._handle_agent_streaming_event(
@@ -710,7 +707,6 @@ async def _run_agent_streaming_async(self, prompt: str) -> TaskStatus:
710707

711708
# Check if task was cancelled
712709
if self.task_state_manager.is_cancelled(self.task_id):
713-
await self._cleanup_resources()
714710
return TaskStatus.COMPLETED
715711

716712
return self._handle_execution_result(result_content, "agent streaming execution")
@@ -813,7 +809,6 @@ async def _run_team_streaming_async(self, prompt: str) -> TaskStatus:
813809
# Checkpoint: Check cancellation during streaming
814810
if self.task_state_manager.is_cancelled(self.task_id):
815811
logger.info(f"Task {self.task_id} cancelled during team streaming")
816-
await self._cleanup_resources()
817812
return TaskStatus.COMPLETED
818813

819814
result_content, reasoning = await self._handle_team_streaming_event(
@@ -824,7 +819,6 @@ async def _run_team_streaming_async(self, prompt: str) -> TaskStatus:
824819

825820
# Check if task was cancelled
826821
if self.task_state_manager.is_cancelled(self.task_id):
827-
await self._cleanup_resources()
828822
return TaskStatus.COMPLETED
829823

830824
return self._handle_execution_result(result_content, "team streaming execution")
@@ -1118,16 +1112,6 @@ def cancel_run(self) -> bool:
11181112
self.task_state_manager.set_state(self.task_id, TaskState.CANCELLED)
11191113
return False
11201114

1121-
async def _cleanup_resources(self) -> None:
1122-
"""
1123-
Clean up resources when task is cancelled
1124-
"""
1125-
try:
1126-
logger.info(f"Cleaning up resources for task {self.task_id}")
1127-
await self.resource_manager.cleanup_task_resources(self.task_id)
1128-
except Exception as e:
1129-
logger.exception(f"Error cleaning up resources for task {self.task_id}: {str(e)}")
1130-
11311115
async def cleanup(self) -> None:
11321116
"""
11331117
Clean up resources used by the agent

executor/agents/claude_code/claude_code_agent.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,6 @@ async def _async_execute(self) -> TaskStatus:
693693
# Check if task was cancelled before execution
694694
if self.task_state_manager.is_cancelled(self.task_id):
695695
logger.info(f"Task {self.task_id} was cancelled before execution")
696-
await self._cleanup_resources()
697696
return TaskStatus.COMPLETED
698697

699698
progress = 65
@@ -768,14 +767,12 @@ async def _async_execute(self) -> TaskStatus:
768767
self.resource_manager.register_resource(
769768
task_id=self.task_id,
770769
resource_id=f"claude_client_{self.session_id}",
771-
cleanup_func=self._cleanup_client,
772770
is_async=True
773771
)
774772

775773
# Check cancellation again before proceeding
776774
if self.task_state_manager.is_cancelled(self.task_id):
777775
logger.info(f"Task {self.task_id} cancelled during client setup")
778-
await self._cleanup_resources()
779776
return TaskStatus.COMPLETED
780777

781778
# Prepare prompt
@@ -790,7 +787,6 @@ async def _async_execute(self) -> TaskStatus:
790787
# Check cancellation before sending query
791788
if self.task_state_manager.is_cancelled(self.task_id):
792789
logger.info(f"Task {self.task_id} cancelled before sending query")
793-
await self._cleanup_resources()
794790
return TaskStatus.COMPLETED
795791

796792
# Use session_id to send messages, ensuring messages are in the same session
@@ -1012,32 +1008,7 @@ async def _async_cancel_run(self) -> None:
10121008
except Exception as e:
10131009
logger.exception(f"Error during async interrupt for session_id {self.session_id}: {str(e)}")
10141010

1015-
async def _cleanup_client(self) -> None:
1016-
"""Clean up client connection and remove from cache"""
1017-
try:
1018-
if self.client and hasattr(self.client, 'close'):
1019-
await self.client.close()
1020-
logger.info(f"Closed client for task {self.task_id}")
10211011

1022-
# Remove client from cache to prevent reuse of terminated client
1023-
if self.session_id in self._clients:
1024-
del self._clients[self.session_id]
1025-
logger.info(f"Removed client from cache for session_id: {self.session_id}")
1026-
except Exception as e:
1027-
logger.exception(f"Error closing client for task {self.task_id}: {e}")
1028-
1029-
async def _cleanup_resources(self) -> None:
1030-
"""Clean up all task resources"""
1031-
try:
1032-
# Use resource manager to clean up all registered resources
1033-
await self.resource_manager.cleanup_task_resources(self.task_id)
1034-
1035-
# Clean up task state
1036-
self.task_state_manager.cleanup(self.task_id)
1037-
1038-
logger.info(f"Completed resource cleanup for task {self.task_id}")
1039-
except Exception as e:
1040-
logger.exception(f"Error in resource cleanup for task {self.task_id}: {e}")
10411012
def _setup_claudecode_dir(self, project_path: str, custom_rules: Dict[str, str]) -> None:
10421013
"""
10431014
Setup .claudecode directory with custom instruction files for Claude Code compatibility

executor/tasks/resource_manager.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
class ResourceHandle:
2525
"""Resource handle"""
2626
resource_id: str
27-
cleanup_func: Callable
28-
cleanup_args: tuple = field(default_factory=tuple)
29-
cleanup_kwargs: dict = field(default_factory=dict)
3027
is_async: bool = False
3128

3229

@@ -53,9 +50,6 @@ def register_resource(
5350
self,
5451
task_id: int,
5552
resource_id: str,
56-
cleanup_func: Callable,
57-
cleanup_args: tuple = (),
58-
cleanup_kwargs: Optional[dict] = None,
5953
is_async: bool = False
6054
) -> None:
6155
"""
@@ -64,23 +58,15 @@ def register_resource(
6458
Args:
6559
task_id: Task ID
6660
resource_id: Unique resource identifier
67-
cleanup_func: Cleanup function
68-
cleanup_args: Positional arguments for cleanup function
69-
cleanup_kwargs: Keyword arguments for cleanup function
7061
is_async: Whether cleanup function is asynchronous
7162
"""
72-
if cleanup_kwargs is None:
73-
cleanup_kwargs = {}
7463

7564
with self._resource_lock:
7665
if task_id not in self._resources:
7766
self._resources[task_id] = []
7867

7968
handle = ResourceHandle(
8069
resource_id=resource_id,
81-
cleanup_func=cleanup_func,
82-
cleanup_args=cleanup_args,
83-
cleanup_kwargs=cleanup_kwargs,
8470
is_async=is_async
8571
)
8672
self._resources[task_id].append(handle)
@@ -104,55 +90,6 @@ def unregister_resource(self, task_id: int, resource_id: str) -> None:
10490
if len(self._resources[task_id]) < original_count:
10591
logger.debug(f"Unregistered resource '{resource_id}' for task {task_id}")
10692

107-
async def cleanup_task_resources(self, task_id: int) -> None:
108-
"""
109-
Clean up all resources for a task (async version)
110-
111-
Args:
112-
task_id: Task ID
113-
"""
114-
resources = []
115-
with self._resource_lock:
116-
resources = self._resources.pop(task_id, [])
117-
118-
if not resources:
119-
logger.debug(f"No resources to cleanup for task {task_id}")
120-
return
121-
122-
logger.info(f"Cleaning up {len(resources)} resources for task {task_id}")
123-
124-
# Cleanup in reverse order, last registered cleaned up first
125-
for handle in reversed(resources):
126-
try:
127-
if handle.is_async:
128-
await handle.cleanup_func(*handle.cleanup_args, **handle.cleanup_kwargs)
129-
else:
130-
handle.cleanup_func(*handle.cleanup_args, **handle.cleanup_kwargs)
131-
logger.debug(f"Cleaned up resource '{handle.resource_id}' for task {task_id}")
132-
except Exception as e:
133-
logger.exception(f"Error cleaning up resource '{handle.resource_id}' for task {task_id}: {e}")
134-
135-
def cleanup_task_resources_sync(self, task_id: int) -> None:
136-
"""
137-
Clean up all resources for a task (sync version)
138-
139-
Args:
140-
task_id: Task ID
141-
"""
142-
try:
143-
# Try to get current event loop
144-
loop = asyncio.get_event_loop()
145-
if loop.is_running():
146-
# If in async context, create task
147-
asyncio.create_task(self.cleanup_task_resources(task_id))
148-
logger.debug(f"Created async cleanup task for task {task_id}")
149-
else:
150-
# Otherwise run directly
151-
loop.run_until_complete(self.cleanup_task_resources(task_id))
152-
except RuntimeError:
153-
# No event loop, create new one
154-
asyncio.run(self.cleanup_task_resources(task_id))
155-
15693
def get_resource_count(self, task_id: int) -> int:
15794
"""
15895
Get count of registered resources for a task

0 commit comments

Comments
 (0)