Files
2025-11-30 09:03:57 +08:00

199 lines
6.3 KiB
Python

"""Process debater agent outputs."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from debate_ops import frontmatter
@dataclass
class ProcessResult:
success: bool
argument_id: str | list[str] | None = None
errors: list[str] | None = None
warnings: list[str] | None = None
REQUIRED_KEYS = {'title', 'claim', 'grounds', 'warrant'}
OPTIONAL_KEYS = {'backing', 'qualifier', 'attacks', 'defends'}
VALID_KEYS = REQUIRED_KEYS | OPTIONAL_KEYS
def process_debater(
debate: str,
side: Literal['proposition', 'opposition'],
exchange: int,
output: str | dict | list
) -> ProcessResult:
"""Process debater output, handling both single arguments and lists of arguments."""
# Parse input to get data structure
if isinstance(output, str):
cleaned = re.sub(r'^```(?:json|yaml)?\s*|\s*```$', '', output.strip(), flags=re.MULTILINE)
try:
parsed = json.loads(cleaned)
except json.JSONDecodeError as e:
return ProcessResult(success=False, errors=[f"Invalid JSON: {e}"])
else:
parsed = output
# Determine if single argument or list of arguments
if isinstance(parsed, list):
# Multiple arguments (e.g., opening statements)
if not parsed:
return ProcessResult(success=False, errors=["Empty argument list"])
all_warnings = []
arg_ids = []
for idx, arg_data in enumerate(parsed):
result = _process_single_argument(
debate=debate,
side=side,
exchange=exchange,
data=arg_data,
index=idx
)
if not result.success:
return result # Fail fast on any error
arg_ids.append(result.argument_id)
if result.warnings:
all_warnings.extend(result.warnings)
return ProcessResult(
success=True,
argument_id=arg_ids,
warnings=all_warnings or None
)
else:
# Single argument (standard case)
result = _process_single_argument(
debate=debate,
side=side,
exchange=exchange,
data=parsed,
index=None
)
if not result.success:
return result
return result
def _process_single_argument(
debate: str,
side: Literal['proposition', 'opposition'],
exchange: int,
data: dict,
index: int | None = None
) -> ProcessResult:
"""Process a single argument and create its file. Does not update debate state."""
warnings = []
# Validate required keys
if missing := REQUIRED_KEYS - set(data.keys()):
return ProcessResult(success=False, errors=[f"Missing required keys: {missing}"])
if extra := set(data.keys()) - VALID_KEYS:
warnings.append(f"Unrecognized keys (ignored): {extra}")
# Validate grounds
if not isinstance(data['grounds'], list) or not data['grounds']:
return ProcessResult(success=False, errors=["'grounds' must be non-empty list"])
if not (1 <= len(data['grounds']) <= 3):
return ProcessResult(success=False, errors=[f"'grounds' must contain 1-3 entries (found {len(data['grounds'])})"])
required_ground_keys = {'source', 'content', 'relevance'}
for idx, ground in enumerate(data['grounds']):
if missing_ground := required_ground_keys - set(ground.keys()):
return ProcessResult(success=False, errors=[f"Ground {idx}: missing keys {missing_ground}"])
# Validate attacks
if len(attacks_list := data.get('attacks', [])) > 3:
return ProcessResult(success=False, errors=[f"Too many attacks ({len(attacks_list)}). Maximum: 3"])
# Validate defends
if len(defends_list := data.get('defends', [])) > 2:
return ProcessResult(success=False, errors=[f"Too many defends ({len(defends_list)}). Maximum: 2"])
# Generate argument ID
side_abbr = 'prop' if side == 'proposition' else 'opp'
if index is not None:
# Multiple arguments: prop_000a, prop_000b, prop_000c, etc.
arg_id = f"{side_abbr}_{exchange:03d}{chr(ord('a') + index)}"
else:
# Single argument: prop_005
arg_id = f"{side_abbr}_{exchange:03d}"
# Create metadata
metadata = {
'id': arg_id,
'side': side_abbr,
'exchange': exchange,
'title': data['title'],
'claim': data['claim'],
'attacks': [
{'target_id': a['target_id'], 'type': a['attack_type']}
for a in attacks_list if a.get('target_id')
],
'defends': [
{'target_id': d['target_id'], 'type': d['defense_type']}
for d in data.get('defends', []) if d.get('target_id')
]
}
# Write argument file
arg_file = Path.cwd() / debate / 'arguments' / f'{arg_id}.md'
arg_file.parent.mkdir(parents=True, exist_ok=True)
frontmatter.dump(frontmatter.Document(metadata, _format_argument_markdown(data)), arg_file)
return ProcessResult(success=True, argument_id=arg_id, warnings=warnings or None)
def _format_argument_markdown(data: dict[str, Any]) -> str:
sections = [f"## Claim\n\n{data['claim']}", "## Grounds"]
# Updated to new ground structure
for idx, g in enumerate(data['grounds'], 1):
sections.extend([
f"### {idx}. {g['source']}",
f"> {g['content']}",
f"**Relevance:** {g['relevance']}"
])
sections.append(f"## Warrant\n\n{data['warrant']}")
if backing := data.get('backing'):
sections.append(f"## Backing\n\n{backing}")
if qualifier := data.get('qualifier'):
sections.append(f"## Qualifier\n\n{qualifier}")
if attacks := data.get('attacks'):
sections.append("## Attacks")
for a in attacks:
sections.extend([
f"### Attacking {a.get('target_id', 'unknown')}",
f"**Type:** {a.get('attack_type', 'unspecified')}",
a.get('content', '')
])
if defends := data.get('defends'):
sections.append("## Defends")
for d in defends:
sections.extend([
f"### Defending {d.get('target_id', 'unknown')}",
f"**Type:** {d.get('defense_type', 'unspecified')}",
d.get('content', '')
])
return '\n\n'.join(sections)