Files
gh-francyjglisboa-agent-ski…/integrations/agentdb_real_integration.py
2025-11-29 18:27:25 +08:00

717 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Real AgentDB Integration - TypeScript/Python Bridge
This module provides real integration with AgentDB CLI, handling the TypeScript/Python
communication barrier while maintaining the "invisible intelligence" experience.
Architecture: Python <-> CLI Bridge <-> AgentDB (TypeScript/Node.js)
"""
import json
import subprocess
import logging
import tempfile
import os
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime
import time
logger = logging.getLogger(__name__)
@dataclass
class Episode:
"""Python representation of AgentDB Episode"""
session_id: str
task: str
input: Optional[str] = None
output: Optional[str] = None
critique: Optional[str] = None
reward: float = 0.0
success: bool = False
latency_ms: Optional[int] = None
tokens_used: Optional[int] = None
tags: Optional[List[str]] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class Skill:
"""Python representation of AgentDB Skill"""
name: str
description: Optional[str] = None
code: Optional[str] = None
signature: Optional[Dict[str, Any]] = None
success_rate: float = 0.0
uses: int = 0
avg_reward: float = 0.0
avg_latency_ms: int = 0
metadata: Optional[Dict[str, Any]] = None
@dataclass
class CausalEdge:
"""Python representation of AgentDB CausalEdge"""
cause: str
effect: str
uplift: float
confidence: float = 0.5
sample_size: Optional[int] = None
mechanism: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class AgentDBCLIException(Exception):
"""Custom exception for AgentDB CLI errors"""
pass
class RealAgentDBBridge:
"""
Real bridge to AgentDB CLI, providing Python interface while maintaining
the "invisible intelligence" experience for users.
"""
def __init__(self, db_path: Optional[str] = None):
"""
Initialize the real AgentDB bridge.
Args:
db_path: Path to AgentDB database file (default: ./agentdb.db)
"""
self.db_path = db_path or "./agentdb.db"
self.is_available = self._check_agentdb_availability()
self._setup_environment()
def _check_agentdb_availability(self) -> bool:
"""Check if AgentDB CLI is available"""
try:
result = subprocess.run(
["agentdb", "--help"],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.warning("AgentDB CLI not available")
return False
def _setup_environment(self):
"""Setup environment variables for AgentDB"""
env = os.environ.copy()
env["AGENTDB_PATH"] = self.db_path
self.env = env
def _run_agentdb_command(self, command: List[str], timeout: int = 30) -> Dict[str, Any]:
"""
Execute AgentDB CLI command and parse output.
Args:
command: Command components
timeout: Command timeout in seconds
Returns:
Parsed result dictionary
Raises:
AgentDBCLIException: If command fails
"""
if not self.is_available:
raise AgentDBCLIException("AgentDB CLI not available")
try:
full_command = ["agentdb"] + command
logger.debug(f"Running AgentDB command: {' '.join(full_command)}")
result = subprocess.run(
full_command,
capture_output=True,
text=True,
timeout=timeout,
env=self.env
)
if result.returncode != 0:
error_msg = f"AgentDB command failed: {result.stderr}"
logger.error(error_msg)
raise AgentDBCLIException(error_msg)
# Parse output (most AgentDB commands return structured text)
return self._parse_agentdb_output(result.stdout)
except subprocess.TimeoutExpired:
raise AgentDBCLIException(f"AgentDB command timed out: {' '.join(command)}")
except Exception as e:
raise AgentDBCLIException(f"Error executing AgentDB command: {str(e)}")
def _parse_agentdb_output(self, output: str) -> Dict[str, Any]:
"""
Parse AgentDB CLI output into structured data.
This is a simplified parser - real implementation would need
to handle different output formats from different commands.
"""
lines = output.strip().split('\n')
# Look for JSON patterns or structured data
for line in lines:
line = line.strip()
if line.startswith('{') and line.endswith('}'):
try:
return json.loads(line)
except json.JSONDecodeError:
continue
# Fallback: extract key information using patterns
result = {
"raw_output": output,
"success": True,
"data": {}
}
# Extract common patterns - handle ANSI escape codes
if "Stored episode #" in output:
# Extract episode ID
for line in lines:
if "Stored episode #" in line:
parts = line.split('#')
if len(parts) > 1:
# Remove ANSI escape codes and extract ID
id_part = parts[1].split()[0].replace('\x1b[0m', '')
try:
result["data"]["episode_id"] = int(id_part)
except ValueError:
result["data"]["episode_id"] = id_part
break
elif "Created skill #" in output:
# Extract skill ID
for line in lines:
if "Created skill #" in line:
parts = line.split('#')
if len(parts) > 1:
# Remove ANSI escape codes and extract ID
id_part = parts[1].split()[0].replace('\x1b[0m', '')
try:
result["data"]["skill_id"] = int(id_part)
except ValueError:
result["data"]["skill_id"] = id_part
break
elif "Added causal edge #" in output:
# Extract edge ID
for line in lines:
if "Added causal edge #" in line:
parts = line.split('#')
if len(parts) > 1:
# Remove ANSI escape codes and extract ID
id_part = parts[1].split()[0].replace('\x1b[0m', '')
try:
result["data"]["edge_id"] = int(id_part)
except ValueError:
result["data"]["edge_id"] = id_part
break
elif "Retrieved" in output and "relevant episodes" in output:
# Parse episode retrieval results
result["data"]["episodes"] = self._parse_episodes_output(output)
elif "Found" in output and "matching skills" in output:
# Parse skill search results
result["data"]["skills"] = self._parse_skills_output(output)
return result
def _parse_episodes_output(self, output: str) -> List[Dict[str, Any]]:
"""Parse episodes from AgentDB output"""
episodes = []
lines = output.split('\n')
current_episode = {}
for line in lines:
line = line.strip()
if line.startswith("#") and "Episode" in line:
if current_episode:
episodes.append(current_episode)
current_episode = {"episode_id": line.split()[1].replace(":", "")}
elif ":" in line and current_episode:
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key == "Task":
current_episode["task"] = value
elif key == "Reward":
try:
current_episode["reward"] = float(value)
except ValueError:
pass
elif key == "Success":
current_episode["success"] = "Yes" in value
elif key == "Similarity":
try:
current_episode["similarity"] = float(value)
except ValueError:
pass
elif key == "Critique":
current_episode["critique"] = value
if current_episode:
episodes.append(current_episode)
return episodes
def _parse_skills_output(self, output: str) -> List[Dict[str, Any]]:
"""Parse skills from AgentDB output"""
skills = []
lines = output.split('\n')
current_skill = {}
for line in lines:
line = line.strip()
if line.startswith("#") and not line.startswith(""):
if current_skill:
skills.append(current_skill)
skill_name = line.replace("#1:", "").strip()
current_skill = {"name": skill_name}
elif ":" in line and current_skill:
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key == "Description":
current_skill["description"] = value
elif key == "Success Rate":
try:
current_skill["success_rate"] = float(value.replace("%", "")) / 100
except ValueError:
pass
elif key == "Uses":
try:
current_skill["uses"] = int(value)
except ValueError:
pass
elif key == "Avg Reward":
try:
current_skill["avg_reward"] = float(value)
except ValueError:
pass
if current_skill:
skills.append(current_skill)
return skills
# Reflexion Memory Methods
def store_episode(self, episode: Episode) -> Optional[int]:
"""
Store a reflexion episode in AgentDB.
Args:
episode: Episode to store
Returns:
Episode ID if successful, None otherwise
"""
try:
command = [
"reflexion", "store",
episode.session_id,
episode.task,
str(episode.reward),
"true" if episode.success else "false"
]
if episode.critique:
command.append(episode.critique)
if episode.input:
command.append(episode.input)
if episode.output:
command.append(episode.output)
if episode.latency_ms:
command.append(str(episode.latency_ms))
if episode.tokens_used:
command.append(str(episode.tokens_used))
result = self._run_agentdb_command(command)
return result.get("data", {}).get("episode_id")
except AgentDBCLIException as e:
logger.error(f"Failed to store episode: {e}")
return None
def retrieve_episodes(self, task: str, k: int = 5, min_reward: float = 0.0,
only_failures: bool = False, only_successes: bool = False) -> List[Dict[str, Any]]:
"""
Retrieve relevant episodes from AgentDB.
Args:
task: Task description
k: Maximum number of episodes to retrieve
min_reward: Minimum reward threshold
only_failures: Only retrieve failed episodes
only_successes: Only retrieve successful episodes
Returns:
List of episodes
"""
try:
command = ["reflexion", "retrieve", task, str(k), str(min_reward)]
if only_failures:
command.append("true")
elif only_successes:
command.append("false")
result = self._run_agentdb_command(command)
return result.get("data", {}).get("episodes", [])
except AgentDBCLIException as e:
logger.error(f"Failed to retrieve episodes: {e}")
return []
def get_critique_summary(self, task: str, only_failures: bool = False) -> Optional[str]:
"""Get critique summary for a task"""
try:
command = ["reflexion", "critique-summary", task]
if only_failures:
command.append("true")
result = self._run_agentdb_command(command)
# The summary is usually in the raw output
return result.get("raw_output", "").split("")[-1].strip()
except AgentDBCLIException as e:
logger.error(f"Failed to get critique summary: {e}")
return None
# Skill Library Methods
def create_skill(self, skill: Skill) -> Optional[int]:
"""
Create a skill in AgentDB.
Args:
skill: Skill to create
Returns:
Skill ID if successful, None otherwise
"""
try:
command = ["skill", "create", skill.name]
if skill.description:
command.append(skill.description)
if skill.code:
command.append(skill.code)
result = self._run_agentdb_command(command)
return result.get("data", {}).get("skill_id")
except AgentDBCLIException as e:
logger.error(f"Failed to create skill: {e}")
return None
def search_skills(self, query: str, k: int = 5, min_success_rate: float = 0.0) -> List[Dict[str, Any]]:
"""
Search for skills in AgentDB.
Args:
query: Search query
k: Maximum number of skills to retrieve
min_success_rate: Minimum success rate threshold
Returns:
List of skills
"""
try:
command = ["skill", "search", query, str(k)]
result = self._run_agentdb_command(command)
return result.get("data", {}).get("skills", [])
except AgentDBCLIException as e:
logger.error(f"Failed to search skills: {e}")
return []
def consolidate_skills(self, min_attempts: int = 3, min_reward: float = 0.7,
time_window_days: int = 7) -> Optional[int]:
"""
Consolidate episodes into skills.
Args:
min_attempts: Minimum number of attempts
min_reward: Minimum reward threshold
time_window_days: Time window in days
Returns:
Number of skills created if successful, None otherwise
"""
try:
command = [
"skill", "consolidate",
str(min_attempts),
str(min_reward),
str(time_window_days)
]
result = self._run_agentdb_command(command)
# Parse the output to get the number of skills created
output = result.get("raw_output", "")
for line in output.split('\n'):
if "Created" in line and "skills" in line:
# Extract number from line like "Created 3 skills"
parts = line.split()
for i, part in enumerate(parts):
if part == "Created" and i + 1 < len(parts):
try:
return int(parts[i + 1])
except ValueError:
break
return 0
except AgentDBCLIException as e:
logger.error(f"Failed to consolidate skills: {e}")
return None
# Causal Memory Methods
def add_causal_edge(self, edge: CausalEdge) -> Optional[int]:
"""
Add a causal edge to AgentDB.
Args:
edge: Causal edge to add
Returns:
Edge ID if successful, None otherwise
"""
try:
command = [
"causal", "add-edge",
edge.cause,
edge.effect,
str(edge.uplift)
]
if edge.confidence != 0.5:
command.append(str(edge.confidence))
if edge.sample_size:
command.append(str(edge.sample_size))
result = self._run_agentdb_command(command)
return result.get("data", {}).get("edge_id")
except AgentDBCLIException as e:
logger.error(f"Failed to add causal edge: {e}")
return None
def query_causal_effects(self, cause: Optional[str] = None, effect: Optional[str] = None,
min_confidence: float = 0.0, min_uplift: float = 0.0,
limit: int = 10) -> List[Dict[str, Any]]:
"""
Query causal effects from AgentDB.
Args:
cause: Cause to query
effect: Effect to query
min_confidence: Minimum confidence threshold
min_uplift: Minimum uplift threshold
limit: Maximum number of results
Returns:
List of causal edges
"""
try:
command = ["causal", "query"]
if cause:
command.append(cause)
if effect:
command.append(effect)
command.extend([str(min_confidence), str(min_uplift), str(limit)])
result = self._run_agentdb_command(command)
# Parse causal edges from output
return self._parse_causal_edges_output(result.get("raw_output", ""))
except AgentDBCLIException as e:
logger.error(f"Failed to query causal effects: {e}")
return []
def _parse_causal_edges_output(self, output: str) -> List[Dict[str, Any]]:
"""Parse causal edges from AgentDB output"""
edges = []
lines = output.split('\n')
for line in lines:
if "" in line and "uplift" in line.lower():
# Parse line like: "use_template → agent_quality (uplift: 0.25, confidence: 0.95)"
parts = line.split("")
if len(parts) >= 2:
cause = parts[0].strip()
effect_rest = parts[1]
effect = effect_rest.split("(")[0].strip()
# Extract uplift and confidence
uplift = 0.0
confidence = 0.0
if "uplift:" in effect_rest:
uplift_part = effect_rest.split("uplift:")[1].split(",")[0].strip()
try:
uplift = float(uplift_part)
except ValueError:
pass
if "confidence:" in effect_rest:
conf_part = effect_rest.split("confidence:")[1].split(")")[0].strip()
try:
confidence = float(conf_part)
except ValueError:
pass
edges.append({
"cause": cause,
"effect": effect,
"uplift": uplift,
"confidence": confidence
})
return edges
# Database Methods
def get_database_stats(self) -> Dict[str, Any]:
"""Get AgentDB database statistics"""
try:
result = self._run_agentdb_command(["db", "stats"])
return self._parse_database_stats(result.get("raw_output", ""))
except AgentDBCLIException as e:
logger.error(f"Failed to get database stats: {e}")
return {}
def _parse_database_stats(self, output: str) -> Dict[str, Any]:
"""Parse database statistics from AgentDB output"""
stats = {}
lines = output.split('\n')
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key.startswith("causal_edges"):
try:
stats["causal_edges"] = int(value)
except ValueError:
pass
elif key.startswith("episodes"):
try:
stats["episodes"] = int(value)
except ValueError:
pass
elif key.startswith("causal_experiments"):
try:
stats["causal_experiments"] = int(value)
except ValueError:
pass
return stats
# Enhanced Methods for Agent-Creator Integration
def enhance_agent_creation(self, user_input: str, domain: str = None) -> Dict[str, Any]:
"""
Enhance agent creation using AgentDB real capabilities.
This method integrates multiple AgentDB features to provide
intelligent enhancement while maintaining the "invisible" experience.
"""
enhancement = {
"templates": [],
"skills": [],
"episodes": [],
"causal_insights": [],
"recommendations": []
}
if not self.is_available:
return enhancement
try:
# 1. Search for relevant skills
skills = self.search_skills(user_input, k=3, min_success_rate=0.7)
enhancement["skills"] = skills
# 2. Retrieve relevant episodes
episodes = self.retrieve_episodes(user_input, k=5, min_reward=0.6)
enhancement["episodes"] = episodes
# 3. Query causal effects
if domain:
causal_effects = self.query_causal_effects(
cause=f"use_{domain}_template",
min_confidence=0.7,
min_uplift=0.1
)
enhancement["causal_insights"] = causal_effects
# 4. Generate recommendations
enhancement["recommendations"] = self._generate_recommendations(
user_input, enhancement
)
logger.info(f"AgentDB enhancement completed: {len(skills)} skills, {len(episodes)} episodes")
except Exception as e:
logger.error(f"AgentDB enhancement failed: {e}")
return enhancement
def _generate_recommendations(self, user_input: str, enhancement: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on AgentDB data"""
recommendations = []
# Skill-based recommendations
if enhancement["skills"]:
recommendations.append(
f"Found {len(enhancement['skills'])} relevant skills from AgentDB"
)
# Episode-based recommendations
if enhancement["episodes"]:
successful_episodes = [e for e in enhancement["episodes"] if e.get("success", False)]
if successful_episodes:
recommendations.append(
f"Found {len(successful_episodes)} successful similar attempts"
)
# Causal insights
if enhancement["causal_insights"]:
best_effect = max(enhancement["causal_insights"],
key=lambda x: x.get("uplift", 0),
default=None)
if best_effect:
recommendations.append(
f"Causal insight: {best_effect['cause']} improves {best_effect['effect']} by {best_effect['uplift']:.1%}"
)
return recommendations
# Global instance for backward compatibility
_agentdb_bridge = None
def get_real_agentdb_bridge(db_path: Optional[str] = None) -> RealAgentDBBridge:
"""Get the global real AgentDB bridge instance"""
global _agentdb_bridge
if _agentdb_bridge is None:
_agentdb_bridge = RealAgentDBBridge(db_path)
return _agentdb_bridge
def is_agentdb_available() -> bool:
"""Check if AgentDB is available"""
try:
bridge = get_real_agentdb_bridge()
return bridge.is_available
except:
return False