🔒 HTTP Bearer in ADK #2802
-
Example Scenario: Using JWT in ADK-Python for Authenticated User CRUDHi community, I’d like to discuss a basic scenario and get guidance on securely handling JWT tokens within the agent flow. ScenarioSuppose we have a tool that performs basic CRUD operations on user accounts. Only authenticated users can modify their own data. We want to use JWT tokens for authentication. The questions are:
Example Flow
I’d love to see an example or discussion about the recommended way to handle this in ADK-Python, especially how to extract the JWT payload safely and integrate it into the flow of the agent. Thanks for any guidance or examples! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 4 replies
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: Use the "User Auth" pattern with an Hi PanchoG17, Thanks for the great question. This is a common and important scenario for building secure agents. Here's a breakdown of the recommended way to handle JWT-based authentication in ADK-Python, addressing your specific points. 1. How to securely extract the
|
Beta Was this translation helpful? Give feedback.
-
|
Have you found a solution yet? |
Beta Was this translation helpful? Give feedback.
-
|
Shouldn't the ADK/Agent be able to extract headers from the HTTP post request simply? |
Beta Was this translation helpful? Give feedback.
-
|
@PanchoG17 thank you for your question. the step 1, 2, 3 in your example flow need to be handled by your agent server (similar as ADK API server) which runs the agent (e.g. via ADK's Runner). And once user is verified you put a signal in the adk session state, and your tools that executes the CRUD operation should check this session state to see whether the signal is there , if it's not there, then your tool should reject the operation. Better put this signal prefixed with "temp:" in the session, if your agent session is not correlated with your user session, so that it won't be persisted. |
Beta Was this translation helpful? Give feedback.
-
|
Hi! This is a simplified and untested extract from a working implementation. In this setup I integrate FastAPI with ADK agents, session persistence, and tool execution, including both MCPToolset-based tools and tools defined directly on the agent. One of the main problems I needed to solve was how to propagate user context and authentication to tools. The approach that worked for me was to keep a login event stored in the session, updating the session state with user-related information. That state can then be accessed by tools defined on the agent itself. For MCP tools, authentication is configured separately through the toolset. This is not meant to be the best or only solution — it’s simply the one that allowed the whole flow to work consistently in my case. The code has been reduced from a larger, working version, so there is definitely room for cleanup and improvement, but the core ideas and integration points are intentionally kept visible. # main.py
import os
import time
import asyncio
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService, Session
from google.adk.events import Event, EventActions
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams
from google.adk.tools.openapi_tool.auth.auth_helpers import token_to_scheme_credential
from google.genai import types
from dependencies import get_user_jwt
# --------------------------
# Basic configuration
# --------------------------
APP_NAME = "basic_adk_example"
DB_STRING = os.getenv("DB_STRING")
MCP_HOST = os.getenv("MCP_HOST") #(fastMCP Server)
# Lock global to avoid race conditions when mutating tool auth
_auth_lock = asyncio.Lock()
# --------------------------
# Agent definition
# --------------------------
agent = Agent(
name="root_agent",
model="gemini-2.0-flash",
instruction="Answer to user and use provided tools",
tools=[
MCPToolset(
connection_params=StreamableHTTPConnectionParams(
url=f"{MCP_HOST}/mcp"
)
)
],
)
# --------------------------
# ADK services
# --------------------------
session_service = DatabaseSessionService(db_url=DB_STRING)
runner = Runner(
agent=agent,
app_name=APP_NAME,
session_service=session_service,
)
# --------------------------
# FastAPI app
# --------------------------
app = FastAPI(title="ADK + FastAPI (basic example)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_headers=["*"],
allow_methods=["*"],
allow_credentials=True,
)
# --------------------------
# Session + MCP auth handling
# --------------------------
async def config_session(
session_id: str,
user_id: int,
auth_header: str,
) -> Session:
"""
- Inject JWT into MCP tools
- Load existing ADK session
- Append a login_event (state_delta)
"""
# Protect shared tool state
async with _auth_lock:
token = auth_header.split(" ")[1]
# Build ADK auth objects from JWT
auth_scheme, auth_credential = token_to_scheme_credential(
"oauth2Token",
"header",
"Authorization",
token,
)
# Apply auth to MCP tools
for tool in agent.tools:
if hasattr(tool, "_connection_params"):
tool._auth_scheme = auth_scheme
tool._auth_credential = auth_credential
tool._connection_params.headers = {
"Authorization": f"Bearer {token}"
}
# Load session
session = await session_service.get_session(
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if not session:
raise HTTPException(400, "Invalid session_id")
# --------------------------
# Register login event
# --------------------------
"""
Note about login_event
The login_event is still present, but it’s not strictly required. It was part of an earlier approach to propagate user context into the session, and I left it in place for now.
With MCPToolset, token-based auth already covers most of this, so this event could likely be removed or simplified.
"""
state_changes = {
"task_status": "active",
"user_id": user_id, #:temp key doesnt work for me
}
login_event = Event(
invocation_id="login_event",
author="root_agent",
actions=EventActions(state_delta=state_changes),
timestamp=time.time(),
)
await session_service.append_event(session, login_event)
return session
# --------------------------
# Chat endpoint
# --------------------------
@app.post("/chat")
async def chat(request: Request, user_id: int = Depends(get_user_jwt)):
body = await request.json()
message = body["message"]
session_id = body["session_id"]
auth_header = request.headers.get("Authorization")
# Configure session and MCP auth
session = await config_session(session_id, user_id, auth_header)
# Build user message
content = types.Content(
role="user",
parts=[types.Part(text=message)],
)
# Run agent
events = runner.run_async(
user_id=user_id,
session_id=session.id,
new_message=content,
)
final_response = ""
async for event in events:
if event.is_final_response() and event.content:
final_response = event.content.parts[0].text
return {"response": final_response} |
Beta Was this translation helpful? Give feedback.
@PanchoG17 thank you for your question. the step 1, 2, 3 in your example flow need to be handled by your agent server (similar as ADK API server) which runs the agent (e.g. via ADK's Runner). And once user is verified you put a signal in the adk session state, and your tools that executes the CRUD operation should check this session state to see whether the signal is there , if it's not there, then your tool should reject the operation. Better put this signal prefixed with "temp:" in the session, if your agent session is not correlated with your user session, so that it won't be persisted.