Files
gh-epieczko-betty/skills/telemetry.capture/INTEGRATION_EXAMPLES.md
2025-11-29 18:26:08 +08:00

12 KiB

Telemetry Integration Examples

This document provides practical examples of integrating telemetry tracking into Betty Framework CLI entrypoints, workflows, and agents.

Quick Start

The Betty Framework provides two main approaches for telemetry integration:

  1. Decorator Pattern - For CLI entrypoints with standard main() functions
  2. Manual Capture - For workflows, agents, and custom integrations

Standard CLI with Return Code

For CLI entrypoints that return an exit code:

#!/usr/bin/env python3
import sys
import os
from typing import Optional, List

# Ensure project root on path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))

from betty.logging_utils import setup_logger
from betty.telemetry_integration import telemetry_tracked

logger = setup_logger(__name__)

@telemetry_tracked(skill_name="my.skill", caller="cli")
def main(argv: Optional[List[str]] = None) -> int:
    """Entry point for CLI execution."""
    argv = argv or sys.argv[1:]

    # Your skill logic here
    if not argv:
        logger.error("Missing required arguments")
        return 1

    # Process and return success
    return 0

if __name__ == "__main__":
    sys.exit(main(sys.argv[1:]))

What the decorator does:

  • Automatically measures execution time
  • Captures success/failure based on return code (0 = success, non-zero = failed)
  • Logs telemetry with sanitized inputs
  • Non-blocking - telemetry failures don't affect the main function

Example: workflow.validate Integration

The workflow.validate skill has been updated with telemetry tracking:

# skills/workflow.validate/workflow_validate.py
from betty.telemetry_integration import telemetry_tracked

@telemetry_tracked(skill_name="workflow.validate", caller="cli")
def main(argv: Optional[List[str]] = None) -> int:
    """Entry point for CLI execution."""
    # ... existing validation logic ...
    return 0 if response["ok"] else 1

Test it:

python3 skills/workflow.validate/workflow_validate.py example.yaml
cat registry/telemetry.json

2. Manual Capture Pattern

Direct Function Call

For programmatic telemetry capture:

from skills.telemetry.capture.telemetry_capture import create_telemetry_entry, capture_telemetry
import time

# Track execution manually
start_time = time.time()
try:
    # Your logic here
    result = execute_my_skill(param1, param2)
    status = "success"
except Exception as e:
    status = "failed"
    raise
finally:
    duration_ms = int((time.time() - start_time) * 1000)

    entry = create_telemetry_entry(
        skill_name="my.skill",
        inputs={"param1": param1, "param2": param2},
        status=status,
        duration_ms=duration_ms,
        caller="api"
    )
    capture_telemetry(entry)

Workflow Execution Tracking

For capturing telemetry within workflow execution:

# In workflow.compose or similar workflow executor
from betty.telemetry_integration import track_skill_execution

def execute_workflow_step(step_config: dict, workflow_name: str):
    """Execute a workflow step with telemetry tracking."""

    skill_name = step_config["skill"]
    skill_args = step_config["args"]

    result = track_skill_execution(
        skill_name=skill_name,
        func=lambda: run_skill(skill_name, skill_args),
        inputs={"args": skill_args},
        workflow=workflow_name,
        caller="workflow"
    )

    return result

Agent Skill Invocation

For tracking skills invoked by agents:

# In agent skill runner
from betty.telemetry_integration import track_skill_execution

def run_agent_skill(agent_name: str, skill_name: str, **kwargs):
    """Run a skill on behalf of an agent with telemetry."""

    result = track_skill_execution(
        skill_name=skill_name,
        func=lambda: execute_skill(skill_name, **kwargs),
        inputs=kwargs,
        agent=agent_name,
        caller="agent"
    )

    return result

3. CLI Helper Function

For simple CLI telemetry without decorators:

from betty.telemetry_integration import capture_cli_telemetry
import time

def main():
    start_time = time.time()
    status = "failed"

    try:
        # Your CLI logic
        process_command()
        status = "success"
        return 0
    except Exception as e:
        logger.error(f"Error: {e}")
        return 1
    finally:
        duration_ms = int((time.time() - start_time) * 1000)
        capture_cli_telemetry(
            skill_name="my.skill",
            inputs={"cli_args": sys.argv[1:]},
            status=status,
            duration_ms=duration_ms,
            caller="cli"
        )

4. Context-Rich Telemetry

Full Context Example

Capture comprehensive context for deep analytics:

entry = create_telemetry_entry(
    skill_name="agent.define",
    inputs={
        "name": "my-agent",
        "mode": "iterative",
        "capabilities": ["code-gen", "testing"]
    },
    status="success",
    duration_ms=2500,
    agent="meta-orchestrator",           # Which agent invoked this
    workflow="agent-creation-pipeline",  # Which workflow this is part of
    caller="api",                        # How it was invoked

    # Custom fields for advanced analytics
    user_id="dev-123",
    environment="staging",
    version="1.0.0"
)

capture_telemetry(entry)

5. Batch Operations

For tracking multiple skill executions in batch:

from skills.telemetry.capture.telemetry_capture import create_telemetry_entry, capture_telemetry
import time

def process_batch(items: list):
    """Process items in batch with per-item telemetry."""

    for item in items:
        start_time = time.time()

        try:
            process_item(item)
            status = "success"
        except Exception as e:
            status = "failed"

        duration_ms = int((time.time() - start_time) * 1000)

        # Capture telemetry for each item
        entry = create_telemetry_entry(
            skill_name="batch.processor",
            inputs={"item_id": item.id},
            status=status,
            duration_ms=duration_ms,
            caller="batch"
        )
        capture_telemetry(entry)

6. Error Handling Best Practices

Graceful Telemetry Failures

Always wrap telemetry in try-except to prevent failures from affecting main logic:

def my_critical_function():
    """Critical function that must not fail due to telemetry issues."""

    start_time = time.time()
    result = None

    try:
        # Critical business logic
        result = perform_critical_operation()
        status = "success"
    except Exception as e:
        status = "failed"
        raise  # Re-raise the original error
    finally:
        # Capture telemetry (failures are logged but don't interrupt)
        try:
            duration_ms = int((time.time() - start_time) * 1000)
            capture_cli_telemetry(
                skill_name="critical.operation",
                inputs={},
                status=status,
                duration_ms=duration_ms
            )
        except Exception as telemetry_error:
            logger.warning(f"Telemetry capture failed: {telemetry_error}")
            # Don't raise - telemetry is not critical

    return result

7. Sensitive Data Protection

Sanitize Inputs

Never capture sensitive data like passwords, tokens, or PII:

def sanitize_inputs(inputs: dict) -> dict:
    """Remove sensitive fields from inputs before logging."""

    sensitive_keys = ["password", "token", "api_key", "secret", "ssn", "credit_card"]

    sanitized = {}
    for key, value in inputs.items():
        if any(sensitive in key.lower() for sensitive in sensitive_keys):
            sanitized[key] = "***REDACTED***"
        else:
            sanitized[key] = value

    return sanitized

# Use it
entry = create_telemetry_entry(
    skill_name="auth.login",
    inputs=sanitize_inputs({"username": "john", "password": "secret123"}),
    status="success",
    duration_ms=150
)

8. Testing with Telemetry

Unit Test Example

import unittest
from unittest.mock import patch, MagicMock

class TestMySkillWithTelemetry(unittest.TestCase):

    @patch('betty.telemetry_integration.capture_cli_telemetry')
    def test_skill_execution_captures_telemetry(self, mock_capture):
        """Test that telemetry is captured on skill execution."""

        # Execute the skill
        from skills.my.skill.my_skill import main
        result = main(["arg1", "arg2"])

        # Verify telemetry was captured
        self.assertEqual(result, 0)
        mock_capture.assert_called_once()

        # Verify telemetry parameters
        call_args = mock_capture.call_args
        self.assertEqual(call_args.kwargs['skill_name'], "my.skill")
        self.assertEqual(call_args.kwargs['status'], "success")
        self.assertGreater(call_args.kwargs['duration_ms'], 0)

9. Migration Checklist

To add telemetry to an existing skill:

  1. Import the telemetry integration:

    from betty.telemetry_integration import telemetry_tracked
    
  2. Apply the decorator to main():

    @telemetry_tracked(skill_name="your.skill", caller="cli")
    def main(argv: Optional[List[str]] = None) -> int:
    
  3. Ensure main() returns an int:

    • Return 0 for success
    • Return non-zero for failure
  4. Test the integration:

    python3 skills/your.skill/your_skill.py test-args
    cat registry/telemetry.json | jq '.[-1]'  # View latest entry
    

10. Querying Telemetry Data

Basic Queries

# Count total telemetry entries
jq 'length' registry/telemetry.json

# Find all failed executions
jq '.[] | select(.status == "failed")' registry/telemetry.json

# Get average duration for a skill
jq '[.[] | select(.skill == "workflow.validate") | .duration_ms] | add / length' registry/telemetry.json

# Top 10 slowest executions
jq 'sort_by(.duration_ms) | reverse | .[0:10]' registry/telemetry.json

# Executions by caller
jq 'group_by(.caller) | map({caller: .[0].caller, count: length})' registry/telemetry.json

Advanced Analytics

# Skills executed in last hour
jq --arg cutoff "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S)" \
   '.[] | select(.timestamp > $cutoff)' registry/telemetry.json

# Success rate by skill
jq 'group_by(.skill) | map({
    skill: .[0].skill,
    total: length,
    successful: ([.[] | select(.status == "success")] | length),
    success_rate: (([.[] | select(.status == "success")] | length) / length * 100)
})' registry/telemetry.json

11. Future: Prometheus Export

Example of how telemetry data could be exported to Prometheus (future implementation):

# Future: skills/telemetry.capture/exporters/prometheus.py
from prometheus_client import Counter, Histogram, Gauge

skill_executions = Counter(
    'betty_skill_executions_total',
    'Total skill executions',
    ['skill', 'status', 'caller']
)

skill_duration = Histogram(
    'betty_skill_duration_seconds',
    'Skill execution duration',
    ['skill', 'caller']
)

def export_to_prometheus(telemetry_entry: dict):
    """Export telemetry entry to Prometheus metrics."""
    skill_executions.labels(
        skill=telemetry_entry['skill'],
        status=telemetry_entry['status'],
        caller=telemetry_entry.get('caller', 'unknown')
    ).inc()

    skill_duration.labels(
        skill=telemetry_entry['skill'],
        caller=telemetry_entry.get('caller', 'unknown')
    ).observe(telemetry_entry['duration_ms'] / 1000.0)

Summary

  • CLI Skills: Use @telemetry_tracked decorator
  • Workflows: Use track_skill_execution() helper
  • Custom Code: Use create_telemetry_entry() + capture_telemetry()
  • Always: Handle telemetry failures gracefully
  • Never: Capture sensitive data in inputs

For more details, see the SKILL.md documentation.