717 lines
24 KiB
Python
717 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Real AgentDB Integration - TypeScript/Python Bridge
|
|
|
|
This module provides real integration with AgentDB CLI, handling the TypeScript/Python
|
|
communication barrier while maintaining the "invisible intelligence" experience.
|
|
|
|
Architecture: Python <-> CLI Bridge <-> AgentDB (TypeScript/Node.js)
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
import tempfile
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class Episode:
|
|
"""Python representation of AgentDB Episode"""
|
|
session_id: str
|
|
task: str
|
|
input: Optional[str] = None
|
|
output: Optional[str] = None
|
|
critique: Optional[str] = None
|
|
reward: float = 0.0
|
|
success: bool = False
|
|
latency_ms: Optional[int] = None
|
|
tokens_used: Optional[int] = None
|
|
tags: Optional[List[str]] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
@dataclass
|
|
class Skill:
|
|
"""Python representation of AgentDB Skill"""
|
|
name: str
|
|
description: Optional[str] = None
|
|
code: Optional[str] = None
|
|
signature: Optional[Dict[str, Any]] = None
|
|
success_rate: float = 0.0
|
|
uses: int = 0
|
|
avg_reward: float = 0.0
|
|
avg_latency_ms: int = 0
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
@dataclass
|
|
class CausalEdge:
|
|
"""Python representation of AgentDB CausalEdge"""
|
|
cause: str
|
|
effect: str
|
|
uplift: float
|
|
confidence: float = 0.5
|
|
sample_size: Optional[int] = None
|
|
mechanism: Optional[str] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
class AgentDBCLIException(Exception):
|
|
"""Custom exception for AgentDB CLI errors"""
|
|
pass
|
|
|
|
class RealAgentDBBridge:
|
|
"""
|
|
Real bridge to AgentDB CLI, providing Python interface while maintaining
|
|
the "invisible intelligence" experience for users.
|
|
"""
|
|
|
|
def __init__(self, db_path: Optional[str] = None):
|
|
"""
|
|
Initialize the real AgentDB bridge.
|
|
|
|
Args:
|
|
db_path: Path to AgentDB database file (default: ./agentdb.db)
|
|
"""
|
|
self.db_path = db_path or "./agentdb.db"
|
|
self.is_available = self._check_agentdb_availability()
|
|
self._setup_environment()
|
|
|
|
def _check_agentdb_availability(self) -> bool:
|
|
"""Check if AgentDB CLI is available"""
|
|
try:
|
|
result = subprocess.run(
|
|
["agentdb", "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
return result.returncode == 0
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
logger.warning("AgentDB CLI not available")
|
|
return False
|
|
|
|
def _setup_environment(self):
|
|
"""Setup environment variables for AgentDB"""
|
|
env = os.environ.copy()
|
|
env["AGENTDB_PATH"] = self.db_path
|
|
self.env = env
|
|
|
|
def _run_agentdb_command(self, command: List[str], timeout: int = 30) -> Dict[str, Any]:
|
|
"""
|
|
Execute AgentDB CLI command and parse output.
|
|
|
|
Args:
|
|
command: Command components
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
Parsed result dictionary
|
|
|
|
Raises:
|
|
AgentDBCLIException: If command fails
|
|
"""
|
|
if not self.is_available:
|
|
raise AgentDBCLIException("AgentDB CLI not available")
|
|
|
|
try:
|
|
full_command = ["agentdb"] + command
|
|
logger.debug(f"Running AgentDB command: {' '.join(full_command)}")
|
|
|
|
result = subprocess.run(
|
|
full_command,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env=self.env
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = f"AgentDB command failed: {result.stderr}"
|
|
logger.error(error_msg)
|
|
raise AgentDBCLIException(error_msg)
|
|
|
|
# Parse output (most AgentDB commands return structured text)
|
|
return self._parse_agentdb_output(result.stdout)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
raise AgentDBCLIException(f"AgentDB command timed out: {' '.join(command)}")
|
|
except Exception as e:
|
|
raise AgentDBCLIException(f"Error executing AgentDB command: {str(e)}")
|
|
|
|
def _parse_agentdb_output(self, output: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse AgentDB CLI output into structured data.
|
|
This is a simplified parser - real implementation would need
|
|
to handle different output formats from different commands.
|
|
"""
|
|
lines = output.strip().split('\n')
|
|
|
|
# Look for JSON patterns or structured data
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('{') and line.endswith('}'):
|
|
try:
|
|
return json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Fallback: extract key information using patterns
|
|
result = {
|
|
"raw_output": output,
|
|
"success": True,
|
|
"data": {}
|
|
}
|
|
|
|
# Extract common patterns - handle ANSI escape codes
|
|
if "Stored episode #" in output:
|
|
# Extract episode ID
|
|
for line in lines:
|
|
if "Stored episode #" in line:
|
|
parts = line.split('#')
|
|
if len(parts) > 1:
|
|
# Remove ANSI escape codes and extract ID
|
|
id_part = parts[1].split()[0].replace('\x1b[0m', '')
|
|
try:
|
|
result["data"]["episode_id"] = int(id_part)
|
|
except ValueError:
|
|
result["data"]["episode_id"] = id_part
|
|
break
|
|
|
|
elif "Created skill #" in output:
|
|
# Extract skill ID
|
|
for line in lines:
|
|
if "Created skill #" in line:
|
|
parts = line.split('#')
|
|
if len(parts) > 1:
|
|
# Remove ANSI escape codes and extract ID
|
|
id_part = parts[1].split()[0].replace('\x1b[0m', '')
|
|
try:
|
|
result["data"]["skill_id"] = int(id_part)
|
|
except ValueError:
|
|
result["data"]["skill_id"] = id_part
|
|
break
|
|
|
|
elif "Added causal edge #" in output:
|
|
# Extract edge ID
|
|
for line in lines:
|
|
if "Added causal edge #" in line:
|
|
parts = line.split('#')
|
|
if len(parts) > 1:
|
|
# Remove ANSI escape codes and extract ID
|
|
id_part = parts[1].split()[0].replace('\x1b[0m', '')
|
|
try:
|
|
result["data"]["edge_id"] = int(id_part)
|
|
except ValueError:
|
|
result["data"]["edge_id"] = id_part
|
|
break
|
|
|
|
elif "Retrieved" in output and "relevant episodes" in output:
|
|
# Parse episode retrieval results
|
|
result["data"]["episodes"] = self._parse_episodes_output(output)
|
|
|
|
elif "Found" in output and "matching skills" in output:
|
|
# Parse skill search results
|
|
result["data"]["skills"] = self._parse_skills_output(output)
|
|
|
|
return result
|
|
|
|
def _parse_episodes_output(self, output: str) -> List[Dict[str, Any]]:
|
|
"""Parse episodes from AgentDB output"""
|
|
episodes = []
|
|
lines = output.split('\n')
|
|
current_episode = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith("#") and "Episode" in line:
|
|
if current_episode:
|
|
episodes.append(current_episode)
|
|
current_episode = {"episode_id": line.split()[1].replace(":", "")}
|
|
elif ":" in line and current_episode:
|
|
key, value = line.split(":", 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
if key == "Task":
|
|
current_episode["task"] = value
|
|
elif key == "Reward":
|
|
try:
|
|
current_episode["reward"] = float(value)
|
|
except ValueError:
|
|
pass
|
|
elif key == "Success":
|
|
current_episode["success"] = "Yes" in value
|
|
elif key == "Similarity":
|
|
try:
|
|
current_episode["similarity"] = float(value)
|
|
except ValueError:
|
|
pass
|
|
elif key == "Critique":
|
|
current_episode["critique"] = value
|
|
|
|
if current_episode:
|
|
episodes.append(current_episode)
|
|
|
|
return episodes
|
|
|
|
def _parse_skills_output(self, output: str) -> List[Dict[str, Any]]:
|
|
"""Parse skills from AgentDB output"""
|
|
skills = []
|
|
lines = output.split('\n')
|
|
current_skill = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith("#") and not line.startswith("═"):
|
|
if current_skill:
|
|
skills.append(current_skill)
|
|
skill_name = line.replace("#1:", "").strip()
|
|
current_skill = {"name": skill_name}
|
|
elif ":" in line and current_skill:
|
|
key, value = line.split(":", 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
if key == "Description":
|
|
current_skill["description"] = value
|
|
elif key == "Success Rate":
|
|
try:
|
|
current_skill["success_rate"] = float(value.replace("%", "")) / 100
|
|
except ValueError:
|
|
pass
|
|
elif key == "Uses":
|
|
try:
|
|
current_skill["uses"] = int(value)
|
|
except ValueError:
|
|
pass
|
|
elif key == "Avg Reward":
|
|
try:
|
|
current_skill["avg_reward"] = float(value)
|
|
except ValueError:
|
|
pass
|
|
|
|
if current_skill:
|
|
skills.append(current_skill)
|
|
|
|
return skills
|
|
|
|
# Reflexion Memory Methods
|
|
|
|
def store_episode(self, episode: Episode) -> Optional[int]:
|
|
"""
|
|
Store a reflexion episode in AgentDB.
|
|
|
|
Args:
|
|
episode: Episode to store
|
|
|
|
Returns:
|
|
Episode ID if successful, None otherwise
|
|
"""
|
|
try:
|
|
command = [
|
|
"reflexion", "store",
|
|
episode.session_id,
|
|
episode.task,
|
|
str(episode.reward),
|
|
"true" if episode.success else "false"
|
|
]
|
|
|
|
if episode.critique:
|
|
command.append(episode.critique)
|
|
if episode.input:
|
|
command.append(episode.input)
|
|
if episode.output:
|
|
command.append(episode.output)
|
|
if episode.latency_ms:
|
|
command.append(str(episode.latency_ms))
|
|
if episode.tokens_used:
|
|
command.append(str(episode.tokens_used))
|
|
|
|
result = self._run_agentdb_command(command)
|
|
return result.get("data", {}).get("episode_id")
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to store episode: {e}")
|
|
return None
|
|
|
|
def retrieve_episodes(self, task: str, k: int = 5, min_reward: float = 0.0,
|
|
only_failures: bool = False, only_successes: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
Retrieve relevant episodes from AgentDB.
|
|
|
|
Args:
|
|
task: Task description
|
|
k: Maximum number of episodes to retrieve
|
|
min_reward: Minimum reward threshold
|
|
only_failures: Only retrieve failed episodes
|
|
only_successes: Only retrieve successful episodes
|
|
|
|
Returns:
|
|
List of episodes
|
|
"""
|
|
try:
|
|
command = ["reflexion", "retrieve", task, str(k), str(min_reward)]
|
|
|
|
if only_failures:
|
|
command.append("true")
|
|
elif only_successes:
|
|
command.append("false")
|
|
|
|
result = self._run_agentdb_command(command)
|
|
return result.get("data", {}).get("episodes", [])
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to retrieve episodes: {e}")
|
|
return []
|
|
|
|
def get_critique_summary(self, task: str, only_failures: bool = False) -> Optional[str]:
|
|
"""Get critique summary for a task"""
|
|
try:
|
|
command = ["reflexion", "critique-summary", task]
|
|
if only_failures:
|
|
command.append("true")
|
|
|
|
result = self._run_agentdb_command(command)
|
|
# The summary is usually in the raw output
|
|
return result.get("raw_output", "").split("═")[-1].strip()
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to get critique summary: {e}")
|
|
return None
|
|
|
|
# Skill Library Methods
|
|
|
|
def create_skill(self, skill: Skill) -> Optional[int]:
|
|
"""
|
|
Create a skill in AgentDB.
|
|
|
|
Args:
|
|
skill: Skill to create
|
|
|
|
Returns:
|
|
Skill ID if successful, None otherwise
|
|
"""
|
|
try:
|
|
command = ["skill", "create", skill.name]
|
|
|
|
if skill.description:
|
|
command.append(skill.description)
|
|
if skill.code:
|
|
command.append(skill.code)
|
|
|
|
result = self._run_agentdb_command(command)
|
|
return result.get("data", {}).get("skill_id")
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to create skill: {e}")
|
|
return None
|
|
|
|
def search_skills(self, query: str, k: int = 5, min_success_rate: float = 0.0) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for skills in AgentDB.
|
|
|
|
Args:
|
|
query: Search query
|
|
k: Maximum number of skills to retrieve
|
|
min_success_rate: Minimum success rate threshold
|
|
|
|
Returns:
|
|
List of skills
|
|
"""
|
|
try:
|
|
command = ["skill", "search", query, str(k)]
|
|
|
|
result = self._run_agentdb_command(command)
|
|
return result.get("data", {}).get("skills", [])
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to search skills: {e}")
|
|
return []
|
|
|
|
def consolidate_skills(self, min_attempts: int = 3, min_reward: float = 0.7,
|
|
time_window_days: int = 7) -> Optional[int]:
|
|
"""
|
|
Consolidate episodes into skills.
|
|
|
|
Args:
|
|
min_attempts: Minimum number of attempts
|
|
min_reward: Minimum reward threshold
|
|
time_window_days: Time window in days
|
|
|
|
Returns:
|
|
Number of skills created if successful, None otherwise
|
|
"""
|
|
try:
|
|
command = [
|
|
"skill", "consolidate",
|
|
str(min_attempts),
|
|
str(min_reward),
|
|
str(time_window_days)
|
|
]
|
|
|
|
result = self._run_agentdb_command(command)
|
|
# Parse the output to get the number of skills created
|
|
output = result.get("raw_output", "")
|
|
for line in output.split('\n'):
|
|
if "Created" in line and "skills" in line:
|
|
# Extract number from line like "Created 3 skills"
|
|
parts = line.split()
|
|
for i, part in enumerate(parts):
|
|
if part == "Created" and i + 1 < len(parts):
|
|
try:
|
|
return int(parts[i + 1])
|
|
except ValueError:
|
|
break
|
|
return 0
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to consolidate skills: {e}")
|
|
return None
|
|
|
|
# Causal Memory Methods
|
|
|
|
def add_causal_edge(self, edge: CausalEdge) -> Optional[int]:
|
|
"""
|
|
Add a causal edge to AgentDB.
|
|
|
|
Args:
|
|
edge: Causal edge to add
|
|
|
|
Returns:
|
|
Edge ID if successful, None otherwise
|
|
"""
|
|
try:
|
|
command = [
|
|
"causal", "add-edge",
|
|
edge.cause,
|
|
edge.effect,
|
|
str(edge.uplift)
|
|
]
|
|
|
|
if edge.confidence != 0.5:
|
|
command.append(str(edge.confidence))
|
|
if edge.sample_size:
|
|
command.append(str(edge.sample_size))
|
|
|
|
result = self._run_agentdb_command(command)
|
|
return result.get("data", {}).get("edge_id")
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to add causal edge: {e}")
|
|
return None
|
|
|
|
def query_causal_effects(self, cause: Optional[str] = None, effect: Optional[str] = None,
|
|
min_confidence: float = 0.0, min_uplift: float = 0.0,
|
|
limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query causal effects from AgentDB.
|
|
|
|
Args:
|
|
cause: Cause to query
|
|
effect: Effect to query
|
|
min_confidence: Minimum confidence threshold
|
|
min_uplift: Minimum uplift threshold
|
|
limit: Maximum number of results
|
|
|
|
Returns:
|
|
List of causal edges
|
|
"""
|
|
try:
|
|
command = ["causal", "query"]
|
|
|
|
if cause:
|
|
command.append(cause)
|
|
if effect:
|
|
command.append(effect)
|
|
command.extend([str(min_confidence), str(min_uplift), str(limit)])
|
|
|
|
result = self._run_agentdb_command(command)
|
|
# Parse causal edges from output
|
|
return self._parse_causal_edges_output(result.get("raw_output", ""))
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to query causal effects: {e}")
|
|
return []
|
|
|
|
def _parse_causal_edges_output(self, output: str) -> List[Dict[str, Any]]:
|
|
"""Parse causal edges from AgentDB output"""
|
|
edges = []
|
|
lines = output.split('\n')
|
|
|
|
for line in lines:
|
|
if "→" in line and "uplift" in line.lower():
|
|
# Parse line like: "use_template → agent_quality (uplift: 0.25, confidence: 0.95)"
|
|
parts = line.split("→")
|
|
if len(parts) >= 2:
|
|
cause = parts[0].strip()
|
|
effect_rest = parts[1]
|
|
effect = effect_rest.split("(")[0].strip()
|
|
|
|
# Extract uplift and confidence
|
|
uplift = 0.0
|
|
confidence = 0.0
|
|
if "uplift:" in effect_rest:
|
|
uplift_part = effect_rest.split("uplift:")[1].split(",")[0].strip()
|
|
try:
|
|
uplift = float(uplift_part)
|
|
except ValueError:
|
|
pass
|
|
if "confidence:" in effect_rest:
|
|
conf_part = effect_rest.split("confidence:")[1].split(")")[0].strip()
|
|
try:
|
|
confidence = float(conf_part)
|
|
except ValueError:
|
|
pass
|
|
|
|
edges.append({
|
|
"cause": cause,
|
|
"effect": effect,
|
|
"uplift": uplift,
|
|
"confidence": confidence
|
|
})
|
|
|
|
return edges
|
|
|
|
# Database Methods
|
|
|
|
def get_database_stats(self) -> Dict[str, Any]:
|
|
"""Get AgentDB database statistics"""
|
|
try:
|
|
result = self._run_agentdb_command(["db", "stats"])
|
|
return self._parse_database_stats(result.get("raw_output", ""))
|
|
|
|
except AgentDBCLIException as e:
|
|
logger.error(f"Failed to get database stats: {e}")
|
|
return {}
|
|
|
|
def _parse_database_stats(self, output: str) -> Dict[str, Any]:
|
|
"""Parse database statistics from AgentDB output"""
|
|
stats = {}
|
|
lines = output.split('\n')
|
|
|
|
for line in lines:
|
|
if ":" in line:
|
|
key, value = line.split(":", 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
if key.startswith("causal_edges"):
|
|
try:
|
|
stats["causal_edges"] = int(value)
|
|
except ValueError:
|
|
pass
|
|
elif key.startswith("episodes"):
|
|
try:
|
|
stats["episodes"] = int(value)
|
|
except ValueError:
|
|
pass
|
|
elif key.startswith("causal_experiments"):
|
|
try:
|
|
stats["causal_experiments"] = int(value)
|
|
except ValueError:
|
|
pass
|
|
|
|
return stats
|
|
|
|
# Enhanced Methods for Agent-Creator Integration
|
|
|
|
def enhance_agent_creation(self, user_input: str, domain: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Enhance agent creation using AgentDB real capabilities.
|
|
|
|
This method integrates multiple AgentDB features to provide
|
|
intelligent enhancement while maintaining the "invisible" experience.
|
|
"""
|
|
enhancement = {
|
|
"templates": [],
|
|
"skills": [],
|
|
"episodes": [],
|
|
"causal_insights": [],
|
|
"recommendations": []
|
|
}
|
|
|
|
if not self.is_available:
|
|
return enhancement
|
|
|
|
try:
|
|
# 1. Search for relevant skills
|
|
skills = self.search_skills(user_input, k=3, min_success_rate=0.7)
|
|
enhancement["skills"] = skills
|
|
|
|
# 2. Retrieve relevant episodes
|
|
episodes = self.retrieve_episodes(user_input, k=5, min_reward=0.6)
|
|
enhancement["episodes"] = episodes
|
|
|
|
# 3. Query causal effects
|
|
if domain:
|
|
causal_effects = self.query_causal_effects(
|
|
cause=f"use_{domain}_template",
|
|
min_confidence=0.7,
|
|
min_uplift=0.1
|
|
)
|
|
enhancement["causal_insights"] = causal_effects
|
|
|
|
# 4. Generate recommendations
|
|
enhancement["recommendations"] = self._generate_recommendations(
|
|
user_input, enhancement
|
|
)
|
|
|
|
logger.info(f"AgentDB enhancement completed: {len(skills)} skills, {len(episodes)} episodes")
|
|
|
|
except Exception as e:
|
|
logger.error(f"AgentDB enhancement failed: {e}")
|
|
|
|
return enhancement
|
|
|
|
def _generate_recommendations(self, user_input: str, enhancement: Dict[str, Any]) -> List[str]:
|
|
"""Generate recommendations based on AgentDB data"""
|
|
recommendations = []
|
|
|
|
# Skill-based recommendations
|
|
if enhancement["skills"]:
|
|
recommendations.append(
|
|
f"Found {len(enhancement['skills'])} relevant skills from AgentDB"
|
|
)
|
|
|
|
# Episode-based recommendations
|
|
if enhancement["episodes"]:
|
|
successful_episodes = [e for e in enhancement["episodes"] if e.get("success", False)]
|
|
if successful_episodes:
|
|
recommendations.append(
|
|
f"Found {len(successful_episodes)} successful similar attempts"
|
|
)
|
|
|
|
# Causal insights
|
|
if enhancement["causal_insights"]:
|
|
best_effect = max(enhancement["causal_insights"],
|
|
key=lambda x: x.get("uplift", 0),
|
|
default=None)
|
|
if best_effect:
|
|
recommendations.append(
|
|
f"Causal insight: {best_effect['cause']} improves {best_effect['effect']} by {best_effect['uplift']:.1%}"
|
|
)
|
|
|
|
return recommendations
|
|
|
|
# Global instance for backward compatibility
|
|
_agentdb_bridge = None
|
|
|
|
def get_real_agentdb_bridge(db_path: Optional[str] = None) -> RealAgentDBBridge:
|
|
"""Get the global real AgentDB bridge instance"""
|
|
global _agentdb_bridge
|
|
if _agentdb_bridge is None:
|
|
_agentdb_bridge = RealAgentDBBridge(db_path)
|
|
return _agentdb_bridge
|
|
|
|
def is_agentdb_available() -> bool:
|
|
"""Check if AgentDB is available"""
|
|
try:
|
|
bridge = get_real_agentdb_bridge()
|
|
return bridge.is_available
|
|
except:
|
|
return False |