Google ADK Masterclass Part 8: Callbacks and Agent Lifecycle
In our previous tutorial, we explored building multi-agent systems where specialized agents collaborate to solve complex problems. Now, we’ll dive into one of ADK’s most powerful but lesser-known features: callbacks.
Callbacks allow you to hook into every stage of an agent’s execution lifecycle, enabling you to monitor, modify, and extend agent behavior. This capability is essential for advanced applications, providing fine-grained control over the agent experience.
Understanding the Agent Lifecycle
Before implementing callbacks, it’s important to understand the agent lifecycle in ADK. When a user interacts with an agent, the following events occur in sequence:
- Pre-Run: Before any processing begins
- Agent Selection: Determining which agent will handle the request
- Pre-LLM Call: Before sending the prompt to the language model
- LLM Response: After receiving a response from the language model
- Pre-Tool Call: Before executing a tool
- Tool Response: After receiving a response from a tool
- Post-Run: After all processing is complete
Each of these stages represents an opportunity to intervene in the agent’s execution process.
Why Use Callbacks?
Callbacks serve several important purposes:
- Monitoring: Track usage, performance, and behavior
- Logging: Record detailed information about agent interactions
- Modification: Alter prompts, responses, or tool calls
- Security: Implement safety filters or rate limiting
- Integration: Connect with external systems
- Debugging: Gain insights into agent behavior
Implementing Basic Callbacks
Let’s build a callback system that logs details at each stage of the agent lifecycle:
Folder Structure
callbacks/
├── main.py
├── utils.py
├── .env
└── logger_agent/
├── __init__.py
└── agent.py
Agent Implementation (logger_agent/agent.py)
from google.adk import Agent
from google.adk.tool import FunctionTool
def get_current_time() -> dict:
"""
Returns the current time.
Use this when the user asks for the current time.
"""
from datetime import datetime
now = datetime.now()
return {
"current_time": now.strftime("%H:%M:%S"),
"current_date": now.strftime("%Y-%m-%d"),
"timezone": datetime.now().astimezone().tzname()
}
logger_agent = Agent(
name="logger_agent",
model="models/gemini-2.0-no-flash",
description="A helpful agent that demonstrates callbacks",
instructions="""
You are a helpful assistant that can provide information and answer questions.
When asked about the current time, use the get_current_time tool.
Be friendly and conversational in your responses.
""",
tools=[FunctionTool(get_current_time)]
)
Callback Implementation (main.py)
import os
import asyncio
import uuid
import time
import json
from datetime import datetime
from dotenv import load_dotenv
from google.adk.orchestration import Runner
from google.adk.orchestration.session import InMemorySessionService
from google.generativeai.types import content_types
from google.generativeai.types.content_types import Part
from logger_agent.agent import logger_agent
# Load environment variables
load_dotenv()
# Create a log file
log_file = "agent_logs.jsonl"
class CallbackLogger:
"""Callback handler that logs details at each stage of the agent lifecycle."""
def __init__(self, log_file):
self.log_file = log_file
self.start_time = None
self.execution_id = None
def log_event(self, event_type, details=None):
"""Log an event to the log file."""
timestamp = datetime.now().isoformat()
log_entry = {
"timestamp": timestamp,
"execution_id": self.execution_id,
"event_type": event_type,
"details": details or {}
}
with open(self.log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
async def on_run_start(self, runner, user_id, session_id, content, **kwargs):
"""Called before the agent starts processing."""
self.start_time = time.time()
self.execution_id = str(uuid.uuid4())
# Extract the user message
user_message = None
for part in content.parts:
if hasattr(part, 'text'):
user_message = part.text
break
self.log_event("run_start", {
"user_id": user_id,
"session_id": session_id,
"user_message": user_message
})
async def on_agent_selection(self, runner, user_id, session_id, agent_name, **kwargs):
"""Called when an agent is selected to handle the request."""
self.log_event("agent_selection", {
"agent_name": agent_name
})
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
"""Called before sending a prompt to the language model."""
# Note: In a real app, you might want to sanitize or truncate the prompt
# as it could be very large
self.log_event("llm_call", {
"agent_name": agent_name,
"prompt_length": len(str(prompt))
})
async def on_llm_response(self, runner, user_id, session_id, agent_name,
prompt, response, **kwargs):
"""Called after receiving a response from the language model."""
self.log_event("llm_response", {
"agent_name": agent_name,
"response_length": len(str(response))
})
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
"""Called before executing a tool."""
self.log_event("tool_call", {
"agent_name": agent_name,
"tool_name": tool_name,
"tool_params": tool_params
})
async def on_tool_response(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, tool_response, **kwargs):
"""Called after receiving a response from a tool."""
self.log_event("tool_response", {
"agent_name": agent_name,
"tool_name": tool_name,
"tool_response": tool_response
})
async def on_run_end(self, runner, user_id, session_id, content, **kwargs):
"""Called after the agent completes processing."""
execution_time = time.time() - self.start_time
# Extract the final response
agent_response = None
if content and hasattr(content, 'parts'):
for part in content.parts:
if hasattr(part, 'text'):
agent_response = part.text
break
self.log_event("run_end", {
"execution_time_seconds": execution_time,
"agent_response": agent_response
})
async def process_user_input(runner, user_id, session_id, query):
"""Process a user query through the agent."""
print(f"\nYou: {query}")
# Create content from the user query
content = content_types.Content(
role="user",
parts=[Part.from_text(query)]
)
# Run the agent with the user query
response = await runner.run_async(
user_id=user_id,
session_id=session_id,
content=content
)
# Process the response
final_response_text = None
for event in response.events:
if event.type == "content" and event.content.role == "agent":
final_response_text = event.content.parts[0].text
print(f"\nAgent: {final_response_text}")
return final_response_text
async def main():
# Clear or create the log file
with open(log_file, "w") as f:
f.write("")
# Create a session service
session_service = InMemorySessionService()
# Create a callback logger
callback_logger = CallbackLogger(log_file)
# Create a session
session_id = str(uuid.uuid4())
session = session_service.create_session(
app_name="CallbackDemo",
user_id="example_user",
session_id=session_id
)
# Create a runner with callbacks
runner = Runner(
root_agent=logger_agent,
session_service=session_service,
callbacks=[
callback_logger.on_run_start,
callback_logger.on_agent_selection,
callback_logger.on_llm_call,
callback_logger.on_llm_response,
callback_logger.on_tool_call,
callback_logger.on_tool_response,
callback_logger.on_run_end
]
)
# Interactive chat loop
print("\nCallback Demo")
print("Type 'exit' or 'quit' to end the conversation")
print("--------------------------------------------------------")
while True:
user_input = input("\nYou: ")
if user_input.lower() in ["exit", "quit"]:
print("Goodbye! Check agent_logs.jsonl for the interaction logs.")
break
# Process the user input
await process_user_input(runner, "example_user", session_id, user_input)
if __name__ == "__main__":
asyncio.run(main())
Running the Example
Run the example with:
cd callbacks
python main.py
Example interaction:
Callback Demo
Type 'exit' or 'quit' to end the conversation
--------------------------------------------------------
You: Hello, what can you do?
Agent: Hello! I'm a helpful assistant designed to answer your questions and provide information on various topics. I can chat with you, provide explanations, help with general knowledge questions, and even tell you the current time if you ask me. Is there something specific you'd like to know or discuss today?
You: What time is it now?
Agent: I've checked the current time for you. It's 14:32:07 on 2025-05-10 in the PDT timezone.
You: quit
Goodbye! Check agent_logs.jsonl for the interaction logs.
After the interaction, you can examine the log file (agent_logs.jsonl
) to see detailed information about each stage of the agent lifecycle. Here’s a sample of what you might see:
{"timestamp": "2025-05-10T14:30:45.123456", "execution_id": "550e8400-e29b-41d4-a716-446655440000", "event_type": "run_start", "details": {"user_id": "example_user", "session_id": "8a7b6c5d-4e3f-2d1c-0b9a-8a7b6c5d4e3f", "user_message": "Hello, what can you do?"}}
{"timestamp": "2025-05-10T14:30:45.234567", "execution_id": "550e8400-e29b-41d4-a716-446655440000", "event_type": "agent_selection", "details": {"agent_name": "logger_agent"}}
{"timestamp": "2025-05-10T14:30:45.345678", "execution_id": "550e8400-e29b-41d4-a716-446655440000", "event_type": "llm_call", "details": {"agent_name": "logger_agent", "prompt_length": 1523}}
{"timestamp": "2025-05-10T14:30:46.456789", "execution_id": "550e8400-e29b-41d4-a716-446655440000", "event_type": "llm_response", "details": {"agent_name": "logger_agent", "response_length": 329}}
{"timestamp": "2025-05-10T14:30:46.567890", "execution_id": "550e8400-e29b-41d4-a716-446655440000", "event_type": "run_end", "details": {"execution_time_seconds": 1.444434, "agent_response": "Hello! I'm a helpful assistant designed to answer your questions and provide information on various topics..."}}
Advanced Callback Techniques
Now that we understand the basics, let’s explore some more advanced callback techniques.
1. Prompt Modification
You can modify prompts before they’re sent to the language model:
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
"""Modify prompt before sending to the language model."""
# Add additional context to every prompt
if isinstance(prompt, str):
enhanced_prompt = f"Remember to maintain a friendly, conversational tone. {prompt}"
return {"prompt": enhanced_prompt}
return None
2. Response Filtering
You can filter or modify responses from the language model:
async def on_llm_response(self, runner, user_id, session_id, agent_name,
prompt, response, **kwargs):
"""Filter responses from the language model."""
# Check for and remove controversial content
if isinstance(response, str) and "controversial_topic" in response.lower():
filtered_response = "I apologize, but I cannot provide information on that topic."
return {"response": filtered_response}
return None
3. Tool Interception
You can intercept and modify tool calls:
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
"""Intercept tool calls."""
# Add rate limiting for expensive API calls
if tool_name == "expensive_api_call":
# Check if we've exceeded our quota
if self.api_call_count > self.max_api_calls:
return {
"tool_response": {
"error": "API call quota exceeded",
"message": "Please try again later."
}
}
self.api_call_count += 1
return None
4. Usage Analytics
You can collect detailed analytics about agent usage:
class AnalyticsCallback:
def __init__(self):
self.sessions = {}
self.tool_usage = {}
self.response_times = []
async def on_run_start(self, runner, user_id, session_id, content, **kwargs):
self.start_time = time.time()
# Track unique sessions
if session_id not in self.sessions:
self.sessions[session_id] = {
"user_id": user_id,
"message_count": 0,
"first_seen": datetime.now().isoformat()
}
self.sessions[session_id]["message_count"] += 1
self.sessions[session_id]["last_seen"] = datetime.now().isoformat()
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
# Track tool usage
if tool_name not in self.tool_usage:
self.tool_usage[tool_name] = 0
self.tool_usage[tool_name] += 1
async def on_run_end(self, runner, user_id, session_id, content, **kwargs):
# Track response times
execution_time = time.time() - self.start_time
self.response_times.append(execution_time)
def get_analytics_report(self):
"""Generate a report of usage analytics."""
return {
"unique_sessions": len(self.sessions),
"total_messages": sum(s["message_count"] for s in self.sessions.values()),
"tool_usage": self.tool_usage,
"avg_response_time": sum(self.response_times) / len(self.response_times) if self.response_times else 0,
"active_users": len(set(s["user_id"] for s in self.sessions.values()))
}
5. Error Handling
You can add error handling to gracefully recover from failures:
class ErrorHandlingCallback:
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
try:
# Normal processing
return None
except Exception as e:
print(f"Error in LLM call: {e}")
# Return a graceful error message
return {
"response": "I'm having trouble processing your request right now. Could you try again?"
}
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
try:
# Normal processing
return None
except Exception as e:
print(f"Error in tool call {tool_name}: {e}")
# Return a graceful error message for the tool
return {
"tool_response": {
"error": f"Error executing {tool_name}",
"message": "The operation couldn't be completed."
}
}
Combining Multiple Callbacks
In real applications, you’ll often want to use multiple callbacks together. ADK makes this easy:
# Create instances of your callback handlers
logger = CallbackLogger("agent_logs.jsonl")
analytics = AnalyticsCallback()
error_handler = ErrorHandlingCallback()
# Create a runner with all callbacks
runner = Runner(
root_agent=your_agent,
session_service=session_service,
callbacks=[
# Logging callbacks
logger.on_run_start,
logger.on_agent_selection,
logger.on_llm_call,
logger.on_llm_response,
logger.on_tool_call,
logger.on_tool_response,
logger.on_run_end,
# Analytics callbacks
analytics.on_run_start,
analytics.on_tool_call,
analytics.on_run_end,
# Error handling callbacks
error_handler.on_llm_call,
error_handler.on_tool_call
]
)
Callbacks are executed in the order they’re registered. If a callback returns a modification (e.g., changing a prompt), that modification is passed to subsequent callbacks.
Creating a Custom Callback Class
For more organized callback management, you can create a class that implements the full callback interface:
from google.adk.orchestration.callback import Callback
class ComprehensiveCallback(Callback):
def __init__(self, log_file):
self.log_file = log_file
self.start_time = None
self.execution_id = None
async def on_run_start(self, runner, user_id, session_id, content, **kwargs):
"""Called before the agent starts processing."""
# Implementation
async def on_agent_selection(self, runner, user_id, session_id, agent_name, **kwargs):
"""Called when an agent is selected to handle the request."""
# Implementation
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
"""Called before sending a prompt to the language model."""
# Implementation
async def on_llm_response(self, runner, user_id, session_id, agent_name,
prompt, response, **kwargs):
"""Called after receiving a response from the language model."""
# Implementation
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
"""Called before executing a tool."""
# Implementation
async def on_tool_response(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, tool_response, **kwargs):
"""Called after receiving a response from a tool."""
# Implementation
async def on_run_end(self, runner, user_id, session_id, content, **kwargs):
"""Called after the agent completes processing."""
# Implementation
# Create a runner with the comprehensive callback
comprehensive_callback = ComprehensiveCallback("agent_logs.jsonl")
runner = Runner(
root_agent=your_agent,
session_service=session_service,
callbacks=[comprehensive_callback]
)
Best Practices for Callbacks
Based on our experience, here are some best practices for working with callbacks:
1. Minimize Computational Overhead
Callbacks run during every agent interaction, so keep them lightweight. Heavy processing should be offloaded to asynchronous processes.
async def on_run_end(self, runner, user_id, session_id, content, **kwargs):
# Don't do this - blocking operation
# time.sleep(5)
# Instead, start an async task
asyncio.create_task(self.process_logs_async(session_id))
2. Be Careful with Modifications
Modifying prompts or responses can have unintended consequences. Test thoroughly and make minimal changes.
3. Handle Exceptions Gracefully
Always use try/except blocks to prevent callback errors from breaking the agent lifecycle:
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
try:
# Your callback logic
self.log_event("llm_call", {"agent_name": agent_name})
return None
except Exception as e:
print(f"Error in on_llm_call callback: {e}")
# Don't disrupt the agent - return None to continue normally
return None
4. Use Structured Logging
Log in structured formats (like JSON) for easier analysis and integration with monitoring tools:
def log_event(self, event_type, details=None):
log_entry = {
"timestamp": datetime.now().isoformat(),
"execution_id": self.execution_id,
"event_type": event_type,
"details": details or {}
}
# Log to both file and monitoring service
with open(self.log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
# Also send to monitoring service
if self.monitoring_service:
asyncio.create_task(self.monitoring_service.send(log_entry))
5. Use Callback States Wisely
You can use callback objects to maintain state across callback stages, but be careful with concurrency:
class CallbackWithState:
def __init__(self):
# Use a dictionary keyed by execution_id for thread safety
self.states = {}
async def on_run_start(self, runner, user_id, session_id, content, **kwargs):
execution_id = str(uuid.uuid4())
self.states[execution_id] = {
"start_time": time.time(),
"user_id": user_id,
"session_id": session_id
}
return {"execution_id": execution_id}
async def on_run_end(self, runner, user_id, session_id, content, execution_id=None, **kwargs):
if execution_id and execution_id in self.states:
state = self.states[execution_id]
duration = time.time() - state["start_time"]
print(f"Execution took {duration} seconds")
# Clean up
del self.states[execution_id]
Real-World Applications of Callbacks
Here are some practical applications of callbacks in production environments:
1. Content Moderation
class ContentModerationCallback:
def __init__(self, moderation_service):
self.moderation_service = moderation_service
async def on_llm_response(self, runner, user_id, session_id, agent_name,
prompt, response, **kwargs):
# Check response against moderation rules
if isinstance(response, str):
moderation_result = await self.moderation_service.check_content(response)
if moderation_result.flagged:
# Replace with safe content
return {"response": "I apologize, but I cannot provide that information."}
return None
2. Rate Limiting
class RateLimitingCallback:
def __init__(self):
self.user_requests = {}
self.max_requests_per_minute = 10
async def on_run_start(self, runner, user_id, session_id, content, **kwargs):
now = time.time()
minute_ago = now - 60
# Initialize if new user
if user_id not in self.user_requests:
self.user_requests[user_id] = []
# Remove requests older than 1 minute
self.user_requests[user_id] = [
timestamp for timestamp in self.user_requests[user_id]
if timestamp > minute_ago
]
# Check if user exceeds rate limit
if len(self.user_requests[user_id]) >= self.max_requests_per_minute:
# Return an early response to avoid LLM call
return {
"content": content_types.Content(
role="agent",
parts=[Part.from_text(
"You've exceeded the rate limit. Please try again shortly."
)]
)
}
# Add this request to the log
self.user_requests[user_id].append(now)
return None
3. Cost Tracking
class CostTrackingCallback:
def __init__(self):
self.costs = {
"llm_calls": 0,
"tool_calls": {}
}
# Cost per 1K tokens (example rates)
self.llm_rates = {
"models/gemini-2.0-no-flash": 0.0001, # $0.0001 per 1K tokens
"models/gemini-2.0-flash": 0.00015 # $0.00015 per 1K tokens
}
self.tool_rates = {
"search_tool": 0.01, # $0.01 per call
"weather_tool": 0.005 # $0.005 per call
}
async def on_llm_call(self, runner, user_id, session_id, agent_name,
prompt, **kwargs):
# Estimate token count (simplified)
token_count = len(str(prompt)) / 4 # Very rough estimate
# Get model info
agent = runner.get_agent(agent_name)
model_name = agent.model if hasattr(agent, 'model') else "unknown"
# Calculate cost
rate = self.llm_rates.get(model_name, 0.0002) # Default rate if unknown
cost = (token_count / 1000) * rate
# Add to total
self.costs["llm_calls"] += cost
async def on_tool_call(self, runner, user_id, session_id, agent_name,
tool_name, tool_params, **kwargs):
# Track tool usage cost
if tool_name not in self.costs["tool_calls"]:
self.costs["tool_calls"][tool_name] = 0
rate = self.tool_rates.get(tool_name, 0.001) # Default rate if unknown
self.costs["tool_calls"][tool_name] += rate
def get_total_cost(self):
tool_cost = sum(self.costs["tool_calls"].values())
return {
"llm_cost": self.costs["llm_calls"],
"tool_cost": tool_cost,
"total_cost": self.costs["llm_calls"] + tool_cost
}
Conclusion
Callbacks are a powerful mechanism for extending and controlling agent behavior in ADK. By hooking into the agent lifecycle, you can monitor, modify, and enhance the agent experience in countless ways. From simple logging to complex integrations with external systems, callbacks provide the flexibility needed for production-grade agent applications.
In this tutorial, we’ve covered:
- Understanding the agent lifecycle
- Implementing basic logging callbacks
- Advanced callback techniques for prompt modification and response filtering
- Best practices for efficient and reliable callbacks
- Real-world applications like content moderation and cost tracking
In the next part of our series, we’ll explore sequential workflows in ADK, allowing you to create complex, multi-step agent interactions with predictable execution paths.
Resources
sequenceDiagram
participant User
participant Runner
participant Callbacks
participant Agent
participant LLM
participant Tool
User->>Runner: Send message
activate Runner
Runner->>Callbacks: on_run_start
Runner->>Callbacks: on_agent_selection
Runner->>Agent: Select agent
activate Agent
Agent->>Callbacks: on_llm_call
Agent->>LLM: Send prompt
activate LLM
LLM-->>Agent: Response
deactivate LLM
Agent->>Callbacks: on_llm_response
opt Tool Call Needed
Agent->>Callbacks: on_tool_call
Agent->>Tool: Execute tool
activate Tool
Tool-->>Agent: Result
deactivate Tool
Agent->>Callbacks: on_tool_response
end
Agent-->>Runner: Final response
deactivate Agent
Runner->>Callbacks: on_run_end
Runner-->>User: Return response
deactivate Runner
Saptak Sen
If you enjoyed this post, you should check out my book: Starting with Spark.