Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion backend/app/api/endpoints/adapter/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,13 @@ def delete_task(
):
"""Delete task"""
task_kinds_service.delete_task(db=db, task_id=task_id, user_id=current_user.id)
return {"message": "Task deleted successfully"}
return {"message": "Task deleted successfully"}

@router.post("/{task_id}/cancel", response_model=TaskInDB)
def cancel_task(
task_id: int,
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db)
):
"""Cancel task - stop execution without deleting executor"""
return task_kinds_service.cancel_task(db=db, task_id=task_id, user_id=current_user.id)
1 change: 1 addition & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Settings(BaseSettings):

# Executor configuration
EXECUTOR_DELETE_TASK_URL: str = "http://localhost:8001/executor-manager/executor/delete"
EXECUTOR_STOP_TASK_URL: str = "http://localhost:8001/executor-manager/executor/stop"

# JWT configuration
SECRET_KEY: str = "secret-key"
Expand Down
67 changes: 66 additions & 1 deletion backend/app/services/adapters/executor_kinds.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ def delete_executor_task_sync(self, executor_name: str, executor_namespace: str)
async def delete_executor_task_async(self, executor_name: str, executor_namespace: str) -> Dict:
"""
Asynchronous version of delete_executor_task

Args:
executor_name: The executor task name to delete
executor_namespace: Executor namespace (required)
Expand Down Expand Up @@ -666,5 +666,70 @@ async def delete_executor_task_async(self, executor_name: str, executor_namespac
detail=f"Error deleting executor task: {str(e)}"
)

def stop_executor_task_sync(self, executor_name: str, executor_namespace: str) -> Dict:
"""
Synchronous version of stop_executor_task to avoid event loop issues
Stop the executor task without deleting the container

Args:
executor_name: The executor task name to stop
executor_namespace: Executor namespace (required)
"""
if not executor_name:
raise HTTPException(status_code=400, detail="executor_name are required")
try:
import requests
payload = {
"executor_name": executor_name,
"executor_namespace": executor_namespace,
}
logger.info(f"executor.stop sync request url={settings.EXECUTOR_STOP_TASK_URL} {payload}")

response = requests.post(
settings.EXECUTOR_STOP_TASK_URL,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30.0
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Error stopping executor task: {str(e)}"
)

async def stop_executor_task_async(self, executor_name: str, executor_namespace: str) -> Dict:
"""
Asynchronous version of stop_executor_task
Stop the executor task without deleting the container

Args:
executor_name: The executor task name to stop
executor_namespace: Executor namespace (required)
"""
if not executor_name:
raise HTTPException(status_code=400, detail="executor_name are required")
try:
payload = {
"executor_name": executor_name,
"executor_namespace": executor_namespace,
}
logger.info(f"executor.stop async request url={settings.EXECUTOR_STOP_TASK_URL} {payload}")

async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
settings.EXECUTOR_STOP_TASK_URL,
json=payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise HTTPException(
status_code=500,
detail=f"Error stopping executor task: {str(e)}"
)


executor_kinds_service = ExecutorKindsService(Kind)
107 changes: 103 additions & 4 deletions backend/app/services/adapters/task_kinds.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,13 @@ def delete_task(
task_subtasks = db.query(Subtask).filter(
Subtask.task_id == task_id
).all()

# Collect unique executor keys to avoid duplicate calls (namespace + name)
unique_executor_keys = set()
for subtask in task_subtasks:
if subtask.executor_name and not subtask.executor_deleted_at:
unique_executor_keys.add((subtask.executor_namespace, subtask.executor_name))

# Stop running subtasks on executor (deduplicated by (namespace, name))
for executor_namespace, executor_name in unique_executor_keys:
try:
Expand All @@ -597,14 +597,14 @@ def delete_task(
except Exception as e:
# Log error but continue with status update
logger.warning(f"Failed to delete executor task ns={executor_namespace} name={executor_name}: {str(e)}")

# Update all subtasks to DELETE status
db.query(Subtask).filter(Subtask.task_id == task_id).update({
Subtask.executor_deleted_at: True,
Subtask.status: SubtaskStatus.DELETE,
Subtask.updated_at: datetime.now()
})

# Update task status to DELETE
task_crd = Task.model_validate(task.json)
if task_crd.status:
Expand All @@ -618,6 +618,105 @@ def delete_task(

db.commit()

def cancel_task(
self, db: Session, *, task_id: int, user_id: int
) -> Dict[str, Any]:
"""
Cancel user Task and stop running subtasks without deleting the executor
Similar to pause, allows user to continue conversation in the same session
"""
logger.info(f"Cancelling task with id: {task_id}")

# Get and validate task
task = db.query(Kind).filter(
Kind.id == task_id,
Kind.user_id == user_id,
Kind.kind == "Task",
Kind.is_active == True
).first()

if not task:
raise HTTPException(
status_code=404,
detail="Task not found"
)

# Validate task ownership
task_crd = Task.model_validate(task.json)
current_status = task_crd.status.status if task_crd.status else "PENDING"

# Only RUNNING or PENDING tasks can be cancelled
if current_status not in ["RUNNING", "PENDING"]:
raise HTTPException(
status_code=400,
detail=f"Task with status {current_status} cannot be cancelled. Only RUNNING or PENDING tasks can be cancelled."
)

try:
# Update task status to CANCELLED using optimistic locking
if task_crd.status:
# Only update if still in RUNNING or PENDING status
if task_crd.status.status in ["RUNNING", "PENDING"]:
task_crd.status.status = "CANCELLED"
task_crd.status.errorMessage = "User cancelled"
task_crd.status.completedAt = datetime.now()
task_crd.status.updatedAt = datetime.now()
else:
raise HTTPException(
status_code=400,
detail=f"Task status changed to {task_crd.status.status}, cannot cancel"
)

task.json = task_crd.model_dump(mode='json', exclude_none=True)
task.updated_at = datetime.now()
flag_modified(task, "json")

# Get all subtasks for the task
task_subtasks = db.query(Subtask).filter(
Subtask.task_id == task_id
).all()

# Update subtasks status to CANCELLED
for subtask in task_subtasks:
if subtask.status in [SubtaskStatus.RUNNING, SubtaskStatus.PENDING]:
subtask.status = SubtaskStatus.CANCELLED
subtask.error_message = "User cancelled"
subtask.completed_at = datetime.now()
subtask.updated_at = datetime.now()

# Collect unique executor keys to notify (namespace + name)
unique_executor_keys = set()
for subtask in task_subtasks:
if subtask.executor_name and subtask.status == SubtaskStatus.RUNNING:
unique_executor_keys.add((subtask.executor_namespace, subtask.executor_name))
Comment on lines +679 to +691
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical logic error: Executor stop notifications will never be sent.

The code collects executor keys (lines 688-691) by checking subtask.status == SubtaskStatus.RUNNING, but this happens after all RUNNING subtasks have been updated to CANCELLED (lines 680-685). This means unique_executor_keys will always be empty, and no stop notifications will be sent to executors.

Apply this diff to fix the ordering:

+            # Collect unique executor keys to notify BEFORE updating status (namespace + name)
+            unique_executor_keys = set()
+            for subtask in task_subtasks:
+                if subtask.executor_name and subtask.status == SubtaskStatus.RUNNING:
+                    unique_executor_keys.add((subtask.executor_namespace, subtask.executor_name))
+
             # Update subtasks status to CANCELLED
             for subtask in task_subtasks:
                 if subtask.status in [SubtaskStatus.RUNNING, SubtaskStatus.PENDING]:
                     subtask.status = SubtaskStatus.CANCELLED
                     subtask.error_message = "User cancelled"
                     subtask.completed_at = datetime.now()
                     subtask.updated_at = datetime.now()

-            # Collect unique executor keys to notify (namespace + name)
-            unique_executor_keys = set()
-            for subtask in task_subtasks:
-                if subtask.executor_name and subtask.status == SubtaskStatus.RUNNING:
-                    unique_executor_keys.add((subtask.executor_namespace, subtask.executor_name))
-
             # Notify executors to stop task (best effort, don't delete containers)
🤖 Prompt for AI Agents
In backend/app/services/adapters/task_kinds.py around lines 679 to 691, the code
collects executor keys after mutating all RUNNING subtasks to CANCELLED so the
set is always empty; to fix, first iterate task_subtasks and collect unique
executor keys for subtasks that have executor_name and are currently RUNNING,
then in a separate loop update those same subtasks' status/error/completion
timestamps to CANCELLED; ensure you use the pre-mutation status check when
building unique_executor_keys so stop notifications will be sent.


# Notify executors to stop task (best effort, don't delete containers)
for executor_namespace, executor_name in unique_executor_keys:
try:
logger.info(f"cancelling task - stop_executor_task ns={executor_namespace} name={executor_name}")
# Use sync version to avoid event loop issues
executor_kinds_service.stop_executor_task_sync(executor_name, executor_namespace)
except Exception as e:
# Log error but continue with status update (best effort)
logger.warning(f"Failed to stop executor task ns={executor_namespace} name={executor_name}: {str(e)}")

# Commit all changes
db.commit()
db.refresh(task)

return self._convert_to_task_dict(task, db, user_id)

except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Error cancelling task {task_id}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error cancelling task: {str(e)}"
)

def create_task_id(self, db: Session, user_id: int) -> int:
"""
Create new task id using kinds table auto increment (pre-allocation mechanism)
Expand Down
13 changes: 13 additions & 0 deletions executor_manager/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ def get_current_task_ids(
def delete_executor(self, pod_name: str) -> Dict[str, Any]:
pass

@abc.abstractmethod
def stop_task(self, executor_name: str) -> Dict[str, Any]:
"""
Stop the task running in the executor without deleting the executor.

Args:
executor_name: Name of the executor

Returns:
Dict containing stop result
"""
pass

@abc.abstractmethod
def get_executor_count(
self, label_selector: Optional[str] = None
Expand Down
77 changes: 77 additions & 0 deletions executor_manager/executors/docker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,83 @@ def delete_executor(self, executor_name: str) -> Dict[str, Any]:
"error_msg": f"Error deleting container: {str(e)}"
}

def stop_task(self, executor_name: str) -> Dict[str, Any]:
"""
Stop the task running in the executor without deleting the executor.
Sends a stop signal to the executor container via HTTP request.

Args:
executor_name (str): Name of the executor container.

Returns:
Dict[str, Any]: Stop result with unified structure.
"""
try:
# Check if container exists and is owned by executor_manager
if not check_container_ownership(executor_name):
return {
"status": "unauthorized",
"error_msg": f"Container '{executor_name}' is not owned by {CONTAINER_OWNER}",
}

# Get container port
port_result = get_container_ports(executor_name)
logger.info(f"Container port info for stop: {executor_name}, {port_result}")

if port_result.get("status") != "success":
return {
"status": "failed",
"error_msg": f"Failed to get container port: {port_result.get('error_msg', 'Unknown error')}"
}

ports = port_result.get("ports", [])
if not ports:
return {
"status": "failed",
"error_msg": f"Executor {executor_name} has no exposed ports"
}

port = ports[0].get("host_port")

# Send stop request to container
stop_endpoint = f"http://{DEFAULT_DOCKER_HOST}:{port}/stop"
logger.info(f"Sending stop request to {stop_endpoint}")

try:
response = self.requests.post(stop_endpoint, timeout=5.0)
if response.status_code == 200:
logger.info(f"Successfully sent stop signal to executor {executor_name}")
return {
"status": "success",
"message": f"Stop signal sent to executor {executor_name}"
}
else:
logger.warning(f"Executor {executor_name} returned status {response.status_code}")
return {
"status": "partial",
"message": f"Stop request sent but got status {response.status_code}"
}
except self.requests.exceptions.Timeout:
# Timeout is acceptable - container might be processing the stop
logger.info(f"Stop request to {executor_name} timed out (acceptable)")
return {
"status": "success",
"message": f"Stop signal sent to executor {executor_name} (timeout)"
}
except self.requests.exceptions.RequestException as e:
logger.warning(f"Failed to send stop request to {executor_name}: {e}")
return {
"status": "failed",
"error_msg": f"Failed to send stop request: {str(e)}"
}

except Exception as e:
logger.error(f"Error stopping task in container {executor_name}: {e}")
return {
"status": "failed",
"error_msg": f"Error stopping task: {str(e)}"
}

def get_executor_count(
self, label_selector: Optional[str] = None
) -> Dict[str, Any]:
Expand Down
18 changes: 18 additions & 0 deletions executor_manager/routers/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class DeleteExecutorRequest(BaseModel):
executor_name: str


class StopExecutorRequest(BaseModel):
executor_name: str
executor_namespace: Optional[str] = None


@app.post("/executor-manager/executor/delete")
async def delete_executor(request: DeleteExecutorRequest, http_request: Request):
try:
Expand All @@ -154,6 +159,19 @@ async def delete_executor(request: DeleteExecutorRequest, http_request: Request)
raise HTTPException(status_code=500, detail=str(e))


@app.post("/executor-manager/executor/stop")
async def stop_executor(request: StopExecutorRequest, http_request: Request):
try:
client_ip = http_request.client.host if http_request.client else "unknown"
logger.info(f"Received request to stop executor: {request.executor_name} from {client_ip}")
executor = ExecutorDispatcher.get_executor(EXECUTOR_DISPATCHER_MODE)
result = executor.stop_task(request.executor_name)
return result
except Exception as e:
logger.error(f"Error stopping executor '{request.executor_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))


@app.get("/executor-manager/executor/load")
async def get_executor_load(http_request: Request):
try:
Expand Down
4 changes: 4 additions & 0 deletions frontend/src/apis/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ export const taskApis = {
return apiClient.delete(`/tasks/${id}`);
},

cancelTask: async (id: number): Promise<Task> => {
return apiClient.post(`/tasks/${id}/cancel`);
},

// Get branch diff
getBranchDiff: async (params: BranchDiffRequest): Promise<BranchDiffResponse> => {
const query = new URLSearchParams();
Expand Down
Loading