12 KiB
Telemetry Integration Examples
This document provides practical examples of integrating telemetry tracking into Betty Framework CLI entrypoints, workflows, and agents.
Quick Start
The Betty Framework provides two main approaches for telemetry integration:
- Decorator Pattern - For CLI entrypoints with standard main() functions
- Manual Capture - For workflows, agents, and custom integrations
1. Decorator Pattern (Recommended for CLI)
Standard CLI with Return Code
For CLI entrypoints that return an exit code:
#!/usr/bin/env python3
import sys
import os
from typing import Optional, List
# Ensure project root on path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from betty.logging_utils import setup_logger
from betty.telemetry_integration import telemetry_tracked
logger = setup_logger(__name__)
@telemetry_tracked(skill_name="my.skill", caller="cli")
def main(argv: Optional[List[str]] = None) -> int:
"""Entry point for CLI execution."""
argv = argv or sys.argv[1:]
# Your skill logic here
if not argv:
logger.error("Missing required arguments")
return 1
# Process and return success
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
What the decorator does:
- Automatically measures execution time
- Captures success/failure based on return code (0 = success, non-zero = failed)
- Logs telemetry with sanitized inputs
- Non-blocking - telemetry failures don't affect the main function
Example: workflow.validate Integration
The workflow.validate skill has been updated with telemetry tracking:
# skills/workflow.validate/workflow_validate.py
from betty.telemetry_integration import telemetry_tracked
@telemetry_tracked(skill_name="workflow.validate", caller="cli")
def main(argv: Optional[List[str]] = None) -> int:
"""Entry point for CLI execution."""
# ... existing validation logic ...
return 0 if response["ok"] else 1
Test it:
python3 skills/workflow.validate/workflow_validate.py example.yaml
cat registry/telemetry.json
2. Manual Capture Pattern
Direct Function Call
For programmatic telemetry capture:
from skills.telemetry.capture.telemetry_capture import create_telemetry_entry, capture_telemetry
import time
# Track execution manually
start_time = time.time()
try:
# Your logic here
result = execute_my_skill(param1, param2)
status = "success"
except Exception as e:
status = "failed"
raise
finally:
duration_ms = int((time.time() - start_time) * 1000)
entry = create_telemetry_entry(
skill_name="my.skill",
inputs={"param1": param1, "param2": param2},
status=status,
duration_ms=duration_ms,
caller="api"
)
capture_telemetry(entry)
Workflow Execution Tracking
For capturing telemetry within workflow execution:
# In workflow.compose or similar workflow executor
from betty.telemetry_integration import track_skill_execution
def execute_workflow_step(step_config: dict, workflow_name: str):
"""Execute a workflow step with telemetry tracking."""
skill_name = step_config["skill"]
skill_args = step_config["args"]
result = track_skill_execution(
skill_name=skill_name,
func=lambda: run_skill(skill_name, skill_args),
inputs={"args": skill_args},
workflow=workflow_name,
caller="workflow"
)
return result
Agent Skill Invocation
For tracking skills invoked by agents:
# In agent skill runner
from betty.telemetry_integration import track_skill_execution
def run_agent_skill(agent_name: str, skill_name: str, **kwargs):
"""Run a skill on behalf of an agent with telemetry."""
result = track_skill_execution(
skill_name=skill_name,
func=lambda: execute_skill(skill_name, **kwargs),
inputs=kwargs,
agent=agent_name,
caller="agent"
)
return result
3. CLI Helper Function
For simple CLI telemetry without decorators:
from betty.telemetry_integration import capture_cli_telemetry
import time
def main():
start_time = time.time()
status = "failed"
try:
# Your CLI logic
process_command()
status = "success"
return 0
except Exception as e:
logger.error(f"Error: {e}")
return 1
finally:
duration_ms = int((time.time() - start_time) * 1000)
capture_cli_telemetry(
skill_name="my.skill",
inputs={"cli_args": sys.argv[1:]},
status=status,
duration_ms=duration_ms,
caller="cli"
)
4. Context-Rich Telemetry
Full Context Example
Capture comprehensive context for deep analytics:
entry = create_telemetry_entry(
skill_name="agent.define",
inputs={
"name": "my-agent",
"mode": "iterative",
"capabilities": ["code-gen", "testing"]
},
status="success",
duration_ms=2500,
agent="meta-orchestrator", # Which agent invoked this
workflow="agent-creation-pipeline", # Which workflow this is part of
caller="api", # How it was invoked
# Custom fields for advanced analytics
user_id="dev-123",
environment="staging",
version="1.0.0"
)
capture_telemetry(entry)
5. Batch Operations
For tracking multiple skill executions in batch:
from skills.telemetry.capture.telemetry_capture import create_telemetry_entry, capture_telemetry
import time
def process_batch(items: list):
"""Process items in batch with per-item telemetry."""
for item in items:
start_time = time.time()
try:
process_item(item)
status = "success"
except Exception as e:
status = "failed"
duration_ms = int((time.time() - start_time) * 1000)
# Capture telemetry for each item
entry = create_telemetry_entry(
skill_name="batch.processor",
inputs={"item_id": item.id},
status=status,
duration_ms=duration_ms,
caller="batch"
)
capture_telemetry(entry)
6. Error Handling Best Practices
Graceful Telemetry Failures
Always wrap telemetry in try-except to prevent failures from affecting main logic:
def my_critical_function():
"""Critical function that must not fail due to telemetry issues."""
start_time = time.time()
result = None
try:
# Critical business logic
result = perform_critical_operation()
status = "success"
except Exception as e:
status = "failed"
raise # Re-raise the original error
finally:
# Capture telemetry (failures are logged but don't interrupt)
try:
duration_ms = int((time.time() - start_time) * 1000)
capture_cli_telemetry(
skill_name="critical.operation",
inputs={},
status=status,
duration_ms=duration_ms
)
except Exception as telemetry_error:
logger.warning(f"Telemetry capture failed: {telemetry_error}")
# Don't raise - telemetry is not critical
return result
7. Sensitive Data Protection
Sanitize Inputs
Never capture sensitive data like passwords, tokens, or PII:
def sanitize_inputs(inputs: dict) -> dict:
"""Remove sensitive fields from inputs before logging."""
sensitive_keys = ["password", "token", "api_key", "secret", "ssn", "credit_card"]
sanitized = {}
for key, value in inputs.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = "***REDACTED***"
else:
sanitized[key] = value
return sanitized
# Use it
entry = create_telemetry_entry(
skill_name="auth.login",
inputs=sanitize_inputs({"username": "john", "password": "secret123"}),
status="success",
duration_ms=150
)
8. Testing with Telemetry
Unit Test Example
import unittest
from unittest.mock import patch, MagicMock
class TestMySkillWithTelemetry(unittest.TestCase):
@patch('betty.telemetry_integration.capture_cli_telemetry')
def test_skill_execution_captures_telemetry(self, mock_capture):
"""Test that telemetry is captured on skill execution."""
# Execute the skill
from skills.my.skill.my_skill import main
result = main(["arg1", "arg2"])
# Verify telemetry was captured
self.assertEqual(result, 0)
mock_capture.assert_called_once()
# Verify telemetry parameters
call_args = mock_capture.call_args
self.assertEqual(call_args.kwargs['skill_name'], "my.skill")
self.assertEqual(call_args.kwargs['status'], "success")
self.assertGreater(call_args.kwargs['duration_ms'], 0)
9. Migration Checklist
To add telemetry to an existing skill:
-
Import the telemetry integration:
from betty.telemetry_integration import telemetry_tracked -
Apply the decorator to main():
@telemetry_tracked(skill_name="your.skill", caller="cli") def main(argv: Optional[List[str]] = None) -> int: -
Ensure main() returns an int:
- Return 0 for success
- Return non-zero for failure
-
Test the integration:
python3 skills/your.skill/your_skill.py test-args cat registry/telemetry.json | jq '.[-1]' # View latest entry
10. Querying Telemetry Data
Basic Queries
# Count total telemetry entries
jq 'length' registry/telemetry.json
# Find all failed executions
jq '.[] | select(.status == "failed")' registry/telemetry.json
# Get average duration for a skill
jq '[.[] | select(.skill == "workflow.validate") | .duration_ms] | add / length' registry/telemetry.json
# Top 10 slowest executions
jq 'sort_by(.duration_ms) | reverse | .[0:10]' registry/telemetry.json
# Executions by caller
jq 'group_by(.caller) | map({caller: .[0].caller, count: length})' registry/telemetry.json
Advanced Analytics
# Skills executed in last hour
jq --arg cutoff "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S)" \
'.[] | select(.timestamp > $cutoff)' registry/telemetry.json
# Success rate by skill
jq 'group_by(.skill) | map({
skill: .[0].skill,
total: length,
successful: ([.[] | select(.status == "success")] | length),
success_rate: (([.[] | select(.status == "success")] | length) / length * 100)
})' registry/telemetry.json
11. Future: Prometheus Export
Example of how telemetry data could be exported to Prometheus (future implementation):
# Future: skills/telemetry.capture/exporters/prometheus.py
from prometheus_client import Counter, Histogram, Gauge
skill_executions = Counter(
'betty_skill_executions_total',
'Total skill executions',
['skill', 'status', 'caller']
)
skill_duration = Histogram(
'betty_skill_duration_seconds',
'Skill execution duration',
['skill', 'caller']
)
def export_to_prometheus(telemetry_entry: dict):
"""Export telemetry entry to Prometheus metrics."""
skill_executions.labels(
skill=telemetry_entry['skill'],
status=telemetry_entry['status'],
caller=telemetry_entry.get('caller', 'unknown')
).inc()
skill_duration.labels(
skill=telemetry_entry['skill'],
caller=telemetry_entry.get('caller', 'unknown')
).observe(telemetry_entry['duration_ms'] / 1000.0)
Summary
- CLI Skills: Use
@telemetry_trackeddecorator - Workflows: Use
track_skill_execution()helper - Custom Code: Use
create_telemetry_entry()+capture_telemetry() - Always: Handle telemetry failures gracefully
- Never: Capture sensitive data in inputs
For more details, see the SKILL.md documentation.