Files
2025-11-30 08:51:46 +08:00

18 KiB

name, description, category
name description category
agent-orchestration-patterns Automatically applies when designing multi-agent systems. Ensures proper tool schema design with Pydantic, agent state management, error handling for tool execution, and orchestration patterns. ai-llm

Agent Orchestration Patterns

When building multi-agent systems and tool-calling workflows, follow these patterns for reliable, maintainable orchestration.

Trigger Keywords: agent, multi-agent, tool calling, orchestration, subagent, tool schema, function calling, agent state, agent routing, agent graph, LangChain, LlamaIndex, Anthropic tools

Agent Integration: Used by ml-system-architect, agent-orchestrator-engineer, llm-app-engineer, security-and-privacy-engineer-ml

Correct Pattern: Tool Schema with Pydantic

from pydantic import BaseModel, Field
from typing import List, Literal, Optional
from enum import Enum


class SearchQuery(BaseModel):
    """Tool input for search."""
    query: str = Field(..., description="Search query string")
    max_results: int = Field(
        10,
        ge=1,
        le=100,
        description="Maximum number of results to return"
    )
    filter_domain: Optional[str] = Field(
        None,
        description="Optional domain to filter results (e.g., 'python.org')"
    )


class SearchResult(BaseModel):
    """Individual search result."""
    title: str
    url: str
    snippet: str
    relevance_score: float = Field(ge=0.0, le=1.0)


class SearchResponse(BaseModel):
    """Tool output for search."""
    results: List[SearchResult]
    total_found: int
    query_time_ms: float


async def search_tool(input: SearchQuery) -> SearchResponse:
    """
    Search the web and return relevant results.

    Args:
        input: Validated search parameters

    Returns:
        Search results with metadata

    Example:
        >>> result = await search_tool(SearchQuery(
        ...     query="Python async patterns",
        ...     max_results=5
        ... ))
        >>> print(result.results[0].title)
    """
    # Implementation
    results = await perform_search(
        query=input.query,
        limit=input.max_results,
        domain_filter=input.filter_domain
    )

    return SearchResponse(
        results=results,
        total_found=len(results),
        query_time_ms=123.45
    )


# Convert to Claude tool schema
def tool_to_anthropic_schema(func, input_model: type[BaseModel]) -> dict:
    """Convert Pydantic model to Anthropic tool schema."""
    return {
        "name": func.__name__.replace("_tool", ""),
        "description": func.__doc__.strip().split("\n")[0],
        "input_schema": input_model.model_json_schema()
    }


# Register tool
SEARCH_TOOL = tool_to_anthropic_schema(search_tool, SearchQuery)

Agent State Management

from typing import List, Dict, Any, Optional
from datetime import datetime
from pydantic import BaseModel, Field
import uuid


class Message(BaseModel):
    """A single message in conversation."""
    role: Literal["user", "assistant", "system"]
    content: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    metadata: Dict[str, Any] = Field(default_factory=dict)


class ToolCall(BaseModel):
    """Record of a tool execution."""
    tool_name: str
    input: Dict[str, Any]
    output: Any
    duration_ms: float
    success: bool
    error: Optional[str] = None
    timestamp: datetime = Field(default_factory=datetime.utcnow)


class AgentState(BaseModel):
    """State for an agent conversation."""
    session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    messages: List[Message] = Field(default_factory=list)
    tool_calls: List[ToolCall] = Field(default_factory=list)
    metadata: Dict[str, Any] = Field(default_factory=dict)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

    def add_message(self, role: str, content: str, **metadata):
        """Add message to conversation history."""
        self.messages.append(
            Message(role=role, content=content, metadata=metadata)
        )
        self.updated_at = datetime.utcnow()

    def add_tool_call(self, tool_call: ToolCall):
        """Record tool execution."""
        self.tool_calls.append(tool_call)
        self.updated_at = datetime.utcnow()

    def get_conversation_history(self) -> List[Dict[str, str]]:
        """Get messages in format for LLM API."""
        return [
            {"role": msg.role, "content": msg.content}
            for msg in self.messages
            if msg.role != "system"
        ]


class AgentStateManager:
    """Manage agent states with persistence."""

    def __init__(self):
        self._states: Dict[str, AgentState] = {}

    async def get_or_create(self, session_id: str | None = None) -> AgentState:
        """Get existing state or create new one."""
        if session_id and session_id in self._states:
            return self._states[session_id]

        state = AgentState(session_id=session_id or str(uuid.uuid4()))
        self._states[state.session_id] = state
        return state

    async def save(self, state: AgentState):
        """Persist agent state."""
        self._states[state.session_id] = state
        # Could also save to database/redis here

    async def load(self, session_id: str) -> Optional[AgentState]:
        """Load agent state from storage."""
        return self._states.get(session_id)

Tool Execution with Error Handling

from typing import Callable, Any, Type
import asyncio
import logging
from datetime import datetime

logger = logging.getLogger(__name__)


class ToolError(Exception):
    """Base tool execution error."""
    pass


class ToolTimeoutError(ToolError):
    """Tool execution timeout."""
    pass


class ToolValidationError(ToolError):
    """Tool input validation error."""
    pass


class ToolExecutor:
    """Execute tools with validation and error handling."""

    def __init__(self, timeout: float = 30.0):
        self.timeout = timeout
        self.tools: Dict[str, tuple[Callable, Type[BaseModel]]] = {}

    def register_tool(
        self,
        name: str,
        func: Callable,
        input_model: Type[BaseModel]
    ):
        """Register a tool with its input schema."""
        self.tools[name] = (func, input_model)

    async def execute(
        self,
        tool_name: str,
        tool_input: Dict[str, Any]
    ) -> ToolCall:
        """
        Execute tool with validation and error handling.

        Args:
            tool_name: Name of tool to execute
            tool_input: Raw input dict from LLM

        Returns:
            ToolCall record with result or error

        Raises:
            ToolError: If tool execution fails unrecoverably
        """
        if tool_name not in self.tools:
            error_msg = f"Unknown tool: {tool_name}"
            logger.error(error_msg)
            return ToolCall(
                tool_name=tool_name,
                input=tool_input,
                output=None,
                duration_ms=0.0,
                success=False,
                error=error_msg
            )

        func, input_model = self.tools[tool_name]
        start_time = datetime.utcnow()

        try:
            # Validate input
            try:
                validated_input = input_model(**tool_input)
            except Exception as e:
                raise ToolValidationError(
                    f"Invalid input for {tool_name}: {str(e)}"
                ) from e

            # Execute with timeout
            try:
                output = await asyncio.wait_for(
                    func(validated_input),
                    timeout=self.timeout
                )
            except asyncio.TimeoutError:
                raise ToolTimeoutError(
                    f"Tool {tool_name} exceeded timeout of {self.timeout}s"
                )

            duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000

            logger.info(
                f"Tool executed successfully",
                extra={
                    "tool_name": tool_name,
                    "duration_ms": duration_ms
                }
            )

            return ToolCall(
                tool_name=tool_name,
                input=tool_input,
                output=output,
                duration_ms=duration_ms,
                success=True
            )

        except ToolError as e:
            duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000

            logger.error(
                f"Tool execution failed",
                extra={
                    "tool_name": tool_name,
                    "error": str(e),
                    "duration_ms": duration_ms
                }
            )

            return ToolCall(
                tool_name=tool_name,
                input=tool_input,
                output=None,
                duration_ms=duration_ms,
                success=False,
                error=str(e)
            )

Agent Orchestration Patterns

Pattern 1: Sequential Agent Chain

from typing import List


class SequentialOrchestrator:
    """Execute agents in sequence, passing output to next."""

    def __init__(self, agents: List[Callable]):
        self.agents = agents

    async def run(self, initial_input: str) -> str:
        """
        Run agents sequentially.

        Args:
            initial_input: Input for first agent

        Returns:
            Output from final agent
        """
        current_input = initial_input

        for i, agent in enumerate(self.agents):
            logger.info(f"Running agent {i + 1}/{len(self.agents)}")
            current_input = await agent(current_input)

        return current_input


# Example usage
async def research_agent(query: str) -> str:
    """Research a topic."""
    # Search and gather information
    return "research results..."


async def synthesis_agent(research: str) -> str:
    """Synthesize research into summary."""
    # Analyze and synthesize
    return "synthesized summary..."


async def writer_agent(summary: str) -> str:
    """Write final article."""
    # Generate polished content
    return "final article..."


# Chain agents
orchestrator = SequentialOrchestrator([
    research_agent,
    synthesis_agent,
    writer_agent
])

result = await orchestrator.run("Tell me about Python async patterns")

Pattern 2: Parallel Agent Execution

import asyncio


class ParallelOrchestrator:
    """Execute multiple agents concurrently."""

    def __init__(self, agents: List[Callable]):
        self.agents = agents

    async def run(self, input: str) -> List[Any]:
        """
        Run all agents in parallel with same input.

        Args:
            input: Input for all agents

        Returns:
            List of outputs from each agent
        """
        tasks = [agent(input) for agent in self.agents]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Handle any failures
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.error(f"Agent {i} failed: {result}")

        return results


# Example: Multiple specialized agents
async def technical_reviewer(code: str) -> str:
    """Review code for technical issues."""
    return "technical review..."


async def security_reviewer(code: str) -> str:
    """Review code for security issues."""
    return "security review..."


async def performance_reviewer(code: str) -> str:
    """Review code for performance issues."""
    return "performance review..."


# Run reviewers in parallel
orchestrator = ParallelOrchestrator([
    technical_reviewer,
    security_reviewer,
    performance_reviewer
])

reviews = await orchestrator.run(code_to_review)

Pattern 3: Router-Based Orchestration

from enum import Enum


class AgentType(str, Enum):
    """Available agent types."""
    TECHNICAL = "technical"
    CREATIVE = "creative"
    ANALYTICAL = "analytical"


class RouterOrchestrator:
    """Route requests to appropriate specialized agent."""

    def __init__(self):
        self.agents: Dict[AgentType, Callable] = {}

    def register(self, agent_type: AgentType, agent: Callable):
        """Register an agent."""
        self.agents[agent_type] = agent

    async def classify_request(self, request: str) -> AgentType:
        """
        Classify request to determine which agent to use.

        Args:
            request: User request

        Returns:
            Agent type to handle request
        """
        # Use LLM to classify
        prompt = f"""Classify this request into one of:
        - technical: Code, debugging, technical implementation
        - creative: Writing, brainstorming, creative content
        - analytical: Data analysis, research, evaluation

        Request: {request}

        Return only the category name."""

        category = await llm_classify(prompt)
        return AgentType(category.lower().strip())

    async def route(self, request: str) -> str:
        """
        Route request to appropriate agent.

        Args:
            request: User request

        Returns:
            Response from selected agent
        """
        agent_type = await self.classify_request(request)
        agent = self.agents.get(agent_type)

        if not agent:
            raise ValueError(f"No agent registered for type: {agent_type}")

        logger.info(f"Routing to {agent_type} agent")
        return await agent(request)

Pattern 4: Hierarchical Agent System

class SupervisorAgent:
    """Supervisor that delegates to specialized sub-agents."""

    def __init__(self):
        self.sub_agents: Dict[str, Callable] = {}
        self.state_manager = AgentStateManager()

    async def delegate(
        self,
        task: str,
        state: AgentState
    ) -> str:
        """
        Decompose task and delegate to sub-agents.

        Args:
            task: High-level task description
            state: Current conversation state

        Returns:
            Final result after delegation
        """
        # Plan decomposition using LLM
        plan = await self.plan_task(task, state)

        results = []
        for subtask in plan.subtasks:
            # Find appropriate sub-agent
            agent = self.find_agent_for_task(subtask)

            # Execute subtask
            result = await agent(subtask.description, state)
            results.append(result)

            # Update state
            state.add_message("assistant", f"Subtask result: {result}")

        # Synthesize final result
        return await self.synthesize_results(task, results, state)

    async def plan_task(self, task: str, state: AgentState) -> TaskPlan:
        """Decompose task into subtasks."""
        # Use LLM to plan
        ...

    def find_agent_for_task(self, subtask: SubTask) -> Callable:
        """Select appropriate sub-agent for subtask."""
        # Match subtask to agent capabilities
        ...

Anti-Patterns

# ❌ No input validation
async def tool(input: dict) -> dict:  # Raw dict!
    return await do_something(input["query"])

# ✅ Better: Use Pydantic for validation
async def tool(input: SearchQuery) -> SearchResponse:
    return await do_something(input.query)


# ❌ No error handling in tool execution
async def execute_tool(name: str, input: dict):
    func = tools[name]
    return await func(input)  # Can fail!

# ✅ Better: Comprehensive error handling
async def execute_tool(name: str, input: dict) -> ToolCall:
    try:
        validated = InputModel(**input)
        result = await func(validated)
        return ToolCall(success=True, output=result)
    except ValidationError as e:
        return ToolCall(success=False, error=str(e))


# ❌ No timeout on tool execution
result = await long_running_tool(input)  # Could hang forever!

# ✅ Better: Add timeout
result = await asyncio.wait_for(
    long_running_tool(input),
    timeout=30.0
)


# ❌ Stateless conversations
async def handle_request(prompt: str) -> str:
    return await agent.run(prompt)  # No history!

# ✅ Better: Maintain conversation state
async def handle_request(prompt: str, session_id: str) -> str:
    state = await state_manager.load(session_id)
    state.add_message("user", prompt)
    response = await agent.run(state.get_conversation_history())
    state.add_message("assistant", response)
    await state_manager.save(state)
    return response


# ❌ No logging of tool calls
result = await tool(input)

# ✅ Better: Log all tool executions
logger.info("Executing tool", extra={
    "tool_name": tool.__name__,
    "input": input.model_dump()
})
result = await tool(input)
logger.info("Tool completed", extra={
    "duration_ms": duration,
    "success": True
})

Best Practices Checklist

  • Define tool schemas with Pydantic models
  • Validate all tool inputs before execution
  • Set timeouts on tool execution
  • Handle tool errors gracefully (don't crash)
  • Maintain conversation state across turns
  • Log all tool executions with inputs and outputs
  • Use typed responses from tools
  • Implement retry logic for transient failures
  • Redact sensitive data in tool logs
  • Use async/await throughout agent code
  • Structure agent output as Pydantic models
  • Track agent performance metrics

Auto-Apply

When building multi-agent systems:

  1. Define tool schemas with Pydantic
  2. Implement ToolExecutor for safe execution
  3. Maintain AgentState for conversations
  4. Add comprehensive error handling
  5. Log all agent and tool interactions
  6. Use appropriate orchestration pattern (sequential, parallel, router, hierarchical)
  7. Set timeouts on all agent operations
  • pydantic-models - For tool schema definition
  • async-await-checker - For async agent patterns
  • llm-app-architecture - For LLM integration
  • structured-errors - For error handling
  • observability-logging - For agent logging
  • type-safety - For type-safe tool definitions