Files
gh-human-frontier-labs-inc-…/scripts/orchestrate_gummy.py
2025-11-29 18:47:38 +08:00

460 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Orchestrate gummy-agent tasks across distributed network using 'd' command.
Handles load balancing, task distribution, and network-wide coordination.
"""
import subprocess
import json
import os
import re
from typing import Dict, List, Optional, Tuple
from datetime import datetime
def run_d_command(cmd: str) -> Tuple[str, int]:
"""
Execute 'dw' command and return output.
Args:
cmd: Full command string (e.g., "dw status")
Returns:
Tuple of (stdout, returncode)
"""
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=30
)
return result.stdout, result.returncode
except subprocess.TimeoutExpired:
return "", -1
except Exception as e:
return f"Error: {e}", -1
def get_load_metrics() -> Dict:
"""
Get load metrics from 'dw load' command.
Returns:
Dict mapping host -> metrics:
{
'host1': {'cpu': 15, 'mem': 45, 'load': 0.23, 'score': 0.28},
'host2': {...},
}
"""
output, code = run_d_command("dw load")
if code != 0:
return {}
metrics = {}
# Parse dw load output
# Expected format:
# host1 CPU: 15% MEM: 45% LOAD: 0.23 SCORE: 0.28
for line in output.strip().split('\n'):
if not line.strip():
continue
# Extract host and metrics
match = re.match(r'(\S+)\s+CPU:\s*(\d+)%\s+MEM:\s*(\d+)%\s+LOAD:\s*([\d.]+)\s+SCORE:\s*([\d.]+)', line)
if match:
host, cpu, mem, load, score = match.groups()
metrics[host] = {
'cpu': int(cpu),
'mem': int(mem),
'load': float(load),
'score': float(score)
}
return metrics
def get_host_status() -> Dict:
"""
Get host availability from 'dw status' command.
Returns:
Dict mapping host -> status:
{
'host1': {'status': 'online', 'ip': '100.1.2.3'},
'host2': {'status': 'offline'},
}
"""
output, code = run_d_command("dw status")
if code != 0:
return {}
status = {}
# Parse dw status output
# Expected format:
# host1 online 100.1.2.3
# host2 offline -
for line in output.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
host = parts[0]
state = parts[1]
ip = parts[2] if len(parts) > 2 else None
status[host] = {
'status': state,
'ip': ip
}
return status
def select_optimal_host(
task_type: Optional[str] = None,
exclude: Optional[List[str]] = None
) -> Optional[Dict]:
"""
Select the best host for a task based on load metrics.
Args:
task_type: Type of task ('database', 'api', 'frontend', etc.)
exclude: List of hosts to exclude from selection
Returns:
Dict with selected host info:
{
'host': 'node-1',
'score': 0.23,
'cpu': 15,
'mem': 45,
'load': 0.23
}
Example:
>>> optimal = select_optimal_host(task_type="database")
>>> print(f"Selected: {optimal['host']}")
Selected: node-1
"""
exclude = exclude or []
# Get current load metrics
load_data = get_load_metrics()
# Get host availability
status_data = get_host_status()
# Filter to online hosts not in exclude list
available_hosts = [
host for host, info in status_data.items()
if info['status'] == 'online' and host not in exclude
]
if not available_hosts:
return None
# Filter load data to available hosts
available_load = {
host: metrics
for host, metrics in load_data.items()
if host in available_hosts
}
if not available_load:
# No load data, pick first available
host = available_hosts[0]
return {
'host': host,
'score': 0.5, # Unknown
'cpu': None,
'mem': None,
'load': None
}
# Select host with lowest score (best performance)
best_host = min(available_load.keys(), key=lambda h: available_load[h]['score'])
best_metrics = available_load[best_host]
return {
'host': best_host,
'score': best_metrics['score'],
'cpu': best_metrics['cpu'],
'mem': best_metrics['mem'],
'load': best_metrics['load']
}
def sync_codebase(host: str, local_path: str, remote_path: Optional[str] = None) -> bool:
"""
Sync codebase to remote host using 'dw sync'.
Args:
host: Target host name
local_path: Local directory to sync
remote_path: Remote path (defaults to same as local)
Returns:
True if sync successful
"""
if remote_path is None:
remote_path = local_path
# Change to local path
original_dir = os.getcwd()
try:
os.chdir(local_path)
# Execute dw sync
output, code = run_d_command(f"dw sync {host}")
return code == 0
finally:
os.chdir(original_dir)
def execute_remote_gummy(
host: str,
task: str,
project_path: str,
sync_first: bool = True
) -> Dict:
"""
Execute gummy task on remote host.
Args:
host: Target host name
task: Gummy task description
project_path: Path to project directory
sync_first: Whether to sync codebase before executing
Returns:
Dict with execution info:
{
'host': 'node-1',
'task': 'optimize queries',
'command': 'cd ... && gummy task ...',
'synced': True,
'launched': True
}
"""
result = {
'host': host,
'task': task,
'synced': False,
'launched': False
}
# Sync if requested
if sync_first:
result['synced'] = sync_codebase(host, project_path)
if not result['synced']:
return result
# Build gummy command
gummy_cmd = f'cd {project_path} && gummy task "{task}"'
# Execute on remote host
full_cmd = f'dw run {host} \'{gummy_cmd}\''
result['command'] = full_cmd
output, code = run_d_command(full_cmd)
result['launched'] = (code == 0)
result['output'] = output
return result
def sync_and_execute_gummy(
host: str,
task: str,
project_dir: str
) -> Dict:
"""
Convenience function: sync codebase and execute gummy task.
Args:
host: Target host name
task: Gummy task description
project_dir: Project directory path
Returns:
Execution result dict
"""
return execute_remote_gummy(host, task, project_dir, sync_first=True)
def parallel_gummy_tasks(tasks: List[Dict]) -> List[Dict]:
"""
Execute multiple gummy tasks in parallel across hosts.
Args:
tasks: List of task dicts:
[
{'host': 'node-1', 'task': 'task1', 'project': '/path'},
{'host': 'node-2', 'task': 'task2', 'project': '/path'},
]
Returns:
List of result dicts
"""
results = []
for task_info in tasks:
result = execute_remote_gummy(
host=task_info['host'],
task=task_info['task'],
project_path=task_info['project'],
sync_first=task_info.get('sync', True)
)
results.append(result)
return results
def monitor_all_specialists() -> Dict:
"""
Get status of all specialists across all hosts.
Returns:
Dict mapping host -> specialists:
{
'node-1': [
{'name': 'database-expert', 'status': 'active', 'sessions': 8},
{'name': 'api-developer', 'status': 'dormant', 'sessions': 3},
],
'node-2': [...],
}
"""
status_data = get_host_status()
online_hosts = [h for h, info in status_data.items() if info['status'] == 'online']
all_specialists = {}
for host in online_hosts:
# Check if gummy is installed
check_cmd = f'dw run {host} "command -v gummy"'
output, code = run_d_command(check_cmd)
if code != 0:
continue
# List specialists
list_cmd = f'dw run {host} "ls -1 ~/.gummy/specialists 2>/dev/null || echo"'
output, code = run_d_command(list_cmd)
if code != 0 or not output.strip():
continue
specialists = []
for spec_name in output.strip().split('\n'):
if not spec_name:
continue
# Get specialist metadata
meta_cmd = f'dw run {host} "cat ~/.gummy/specialists/{spec_name}/meta.yaml 2>/dev/null"'
meta_output, meta_code = run_d_command(meta_cmd)
if meta_code == 0:
# Parse YAML (simple key: value format)
spec_info = {'name': spec_name}
for line in meta_output.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
spec_info[key.strip()] = value.strip()
specialists.append(spec_info)
if specialists:
all_specialists[host] = specialists
return all_specialists
def comprehensive_distributed_report() -> Dict:
"""
Generate comprehensive report of entire distributed network.
Returns:
Dict with complete network state:
{
'timestamp': '2025-10-19T22:00:00',
'hosts': {...},
'load': {...},
'specialists': {...},
'summary': 'Summary text'
}
"""
report = {
'timestamp': datetime.now().isoformat(),
'hosts': get_host_status(),
'load': get_load_metrics(),
'specialists': monitor_all_specialists()
}
# Generate summary
online_count = sum(1 for h in report['hosts'].values() if h['status'] == 'online')
total_count = len(report['hosts'])
specialist_count = sum(len(specs) for specs in report['specialists'].values())
report['summary'] = (
f"{online_count}/{total_count} hosts online, "
f"{specialist_count} active specialists"
)
return report
def main():
"""Test orchestration functions."""
print("=" * 70)
print("DISTRIBUTED GUMMY ORCHESTRATOR - TEST")
print("=" * 70)
# Test 1: Load metrics
print("\n✓ Testing load metrics...")
load = get_load_metrics()
for host, metrics in load.items():
print(f" {host}: CPU {metrics['cpu']}%, Score {metrics['score']}")
# Test 2: Host status
print("\n✓ Testing host status...")
status = get_host_status()
for host, info in status.items():
print(f" {host}: {info['status']}")
# Test 3: Optimal host selection
print("\n✓ Testing optimal host selection...")
optimal = select_optimal_host()
if optimal:
print(f" Best host: {optimal['host']} (score: {optimal['score']})")
else:
print(" No hosts available")
# Test 4: Specialist monitoring
print("\n✓ Testing specialist monitoring...")
specialists = monitor_all_specialists()
for host, specs in specialists.items():
print(f" {host}: {len(specs)} specialists")
for spec in specs:
print(f" - {spec['name']}")
# Test 5: Comprehensive report
print("\n✓ Testing comprehensive report...")
report = comprehensive_distributed_report()
print(f" {report['summary']}")
print("\n" + "=" * 70)
print("✅ ALL TESTS COMPLETE")
print("=" * 70)
if __name__ == "__main__":
main()