Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 09:03:57 +08:00
commit 690b868796
19 changed files with 1888 additions and 0 deletions

View File

@@ -0,0 +1,268 @@
---
name: debate-orchestrator
description: Orchestrates formal debates with proposition and opposition sides, coordinating debaters and judges through structured exchanges. Use when running debate exchanges, managing debate rounds, or continuing interrupted debates.
---
# Debate Orchestrator
Manages formal debate execution through deterministic state tracking and resumability.
## State Machine
Debates cycle through 2 phases per exchange:
| current_phase | Action Required |
|---------------|-----------------|
| `awaiting_arguments` | Spawn both debaters in parallel |
| `awaiting_judgment` | Spawn judge to evaluate all new arguments |
After judgment: cycle repeats with `current_exchange` incremented.
**Key Properties:**
- No "complete" state - orchestrator decides when to stop based on requested exchange count
- Parallel execution - both sides argue simultaneously each exchange
- Resumable - read state, execute required action, repeat
- Exchange 0 is special (opening) - both sides produce 3 independent arguments
- Exchange 1+ are rebuttal - sides produce single arguments with attacks/defends
## Running Exchanges
### 1. Read State
Check `{debate}/debate.md` frontmatter (JSON format):
```json
{
"current_exchange": 0,
"current_phase": "awaiting_arguments"
}
```
Extract motion from the `# Motion` section (first markdown heading after frontmatter).
### 2. Determine Exchange Type
**Opening Exchange**: `current_exchange == 0`
- Both debaters produce 3 independent arguments simultaneously
- Judge scores all 6 arguments
**Rebuttal Exchange**: `current_exchange >= 1`
- Both debaters produce 1 argument simultaneously
- Judge scores both new arguments
### 3. Execute Based on Phase + Exchange Type
#### Opening Exchange (Exchange 0)
When `current_exchange == 0` and `current_phase == awaiting_arguments`:
**Load template:**
Read `templates/debater-opening.md` from this skill's directory.
**Spawn both debaters in parallel:**
Use a single message with two Task tool invocations to spawn both debaters simultaneously.
For each side (`proposition` and `opposition`):
1. Substitute placeholders in template:
- `{motion}`: Extracted motion text
- `{side}`: Side name (`proposition` or `opposition`)
2. Spawn debater:
```
Use Task tool with subagent_type: "debater"
Prompt: [substituted template content]
```
**Process outputs:**
After both debaters complete:
1. Write proposition output to `/tmp/prop_arg.json`
2. Write opposition output to `/tmp/opp_arg.json`
3. Execute the python package `debate_ops`: `python3 {skill_base_dir}/debate_ops process-exchange {debate} 0 --prop-file /tmp/prop_arg.json --opp-file /tmp/opp_arg.json`
Check result JSON for errors or warnings. On errors, state remains unchanged - report to user and halt. On warnings, note them and continue.
The script creates 6 argument files: `prop_000a.md`, `prop_000b.md`, `prop_000c.md`, `opp_000a.md`, `opp_000b.md`, `opp_000c.md`
State automatically updates to `current_phase: awaiting_judgment`.
**Judge opening arguments:**
When `current_exchange == 0` and `current_phase == awaiting_judgment`:
**Load template:**
Read `templates/judge.md` from this skill's directory.
**Substitute placeholders:**
- `{argument_files}`: Space-separated list of all 6 opening arguments:
```
@{debate}/arguments/prop_000a.md @{debate}/arguments/prop_000b.md @{debate}/arguments/prop_000c.md @{debate}/arguments/opp_000a.md @{debate}/arguments/opp_000b.md @{debate}/arguments/opp_000c.md
```
- `{motion}`: Extracted motion text
**Spawn judge:**
```
Use Task tool with subagent_type: "judge"
Prompt: [substituted template content]
```
**Process output:**
1. Use Write tool to save agent output to `/tmp/judge.json`
2. Execute the python package `debate_ops`: `python3 {skill_base_dir}/debate_ops process-judge {debate} --json-file /tmp/judge.json`
Check result JSON for errors or warnings. On errors, state remains unchanged - report to user and halt. On warnings, note them and continue.
State automatically updates to `current_phase: awaiting_arguments`, `current_exchange: 1`.
#### Rebuttal Exchange (Exchange 1+)
When `current_exchange >= 1` and `current_phase == awaiting_arguments`:
**Build argument context:**
1. List all files in `{debate}/arguments/`
2. Separate into proposition and opposition arguments:
- Proposition: Files matching `prop_*.md`
- Opposition: Files matching `opp_*.md`
3. Filter to arguments from previous exchanges only:
- Extract exchange number from filename (e.g., `prop_003` → exchange 3)
- Include only arguments where exchange < current_exchange
4. Sort by exchange number (chronological order)
**Load template:**
Read `templates/debater-rebuttal.md` from this skill's directory.
**Spawn both debaters in parallel:**
Use a single message with two Task tool invocations to spawn both debaters simultaneously.
For proposition debater:
- Substitute placeholders:
- `{motion}`: Extracted motion text
- `{side}`: `proposition`
- `{exchange}`: Current exchange number
- `{your_arguments}`: Newline-separated list: `@{debate}/arguments/prop_000a.md`, `@{debate}/arguments/prop_000b.md`, etc.
- `{opponent_arguments}`: Newline-separated list: `@{debate}/arguments/opp_000a.md`, `@{debate}/arguments/opp_000b.md`, etc.
For opposition debater:
- Substitute placeholders:
- `{motion}`: Extracted motion text
- `{side}`: `opposition`
- `{exchange}`: Current exchange number
- `{your_arguments}`: Newline-separated list of opposition arguments
- `{opponent_arguments}`: Newline-separated list of proposition arguments
**Process outputs:**
After both debaters complete:
1. Write proposition output to `/tmp/prop_arg.json`
2. Write opposition output to `/tmp/opp_arg.json`
3. Execute the python package `debate_ops`: `python3 {skill_base_dir}/debate_ops process-exchange {debate} {current_exchange} --prop-file /tmp/prop_arg.json --opp-file /tmp/opp_arg.json`
Check result JSON for errors or warnings. On errors, state remains unchanged - report to user and halt. On warnings, note them and continue.
State automatically updates to `current_phase: awaiting_judgment`.
**Judge rebuttal arguments:**
When `current_exchange >= 1` and `current_phase == awaiting_judgment`:
**Load template:**
Read `templates/judge.md` from this skill's directory.
**Substitute placeholders:**
- `{argument_files}`: Space-separated list of both new arguments:
```
@{debate}/arguments/prop_{current_exchange:03d}.md @{debate}/arguments/opp_{current_exchange:03d}.md
```
- `{motion}`: Extracted motion text
**Spawn judge:**
```
Use Task tool with subagent_type: "judge"
Prompt: [substituted template content]
```
**Process output:**
1. Use Write tool to save agent output to `/tmp/judge.json`
2. Execute the python package `debate_ops`: `python3 {skill_base_dir}/debate_ops process-judge {debate} --json-file /tmp/judge.json`
Check result JSON for errors or warnings. On errors, state remains unchanged - report to user and halt. On warnings, note them and continue.
State automatically updates to `current_phase: awaiting_arguments`, `current_exchange` incremented.
### 4. Decide When to Stop
After each phase, check if you should continue:
- Read the updated state from `{debate}/debate.md`
- Compare current exchange number to requested total exchanges
- If sufficient exchanges completed: stop and report
- Otherwise: loop back to step 1
The state itself doesn't track "completion" - you decide when done based on user request.
## Error Handling
Processing scripts return:
```json
{
"success": true/false,
"argument_id": "prop_001" | ["prop_000a", "prop_000b", "prop_000c"],
"errors": ["fatal errors"],
"warnings": ["non-fatal warnings"]
}
```
**On errors:**
- State remains unchanged - can safely retry
- Report error to user
- Ask how to proceed (retry, skip, abort)
**On warnings:**
- Note them
- Continue execution
- Mention warnings in completion summary
Note: By default the `tmp` files get deleted by the script. But if you face errors while writing to a `tmp` file because it already exists, just `Read` it and try again.
## Resumability
Execution can be interrupted at any point and resumed by reading state:
- State indicates exactly what phase is needed next
- Execute that phase
- State updates atomically on success
- On failure, state remains unchanged - retry is safe
## Completion Report
When requested exchanges complete, report current state:
```
✓ Completed {N} exchanges for '{debate_slug}'
**Current Scores** (zero-sum tug-of-war):
- Proposition: {total} ({count} arguments)
- Opposition: {total} ({count} arguments)
**Next steps**:
- Continue debating: `/debate-run {debate_slug} X` to run X more exchanges
- Generate report: `/debate-report {debate_slug}` to create comprehensive analysis with visualizations
```
Extract totals and counts from `cumulative_scores` in `{debate}/debate.md` frontmatter.
Total exchanges = current_exchange from debate.md.
**Note on zero-sum scoring:** Positive total = winning, negative total = losing, zero = even. One side typically has positive total, the other negative (tug-of-war).

View File

@@ -0,0 +1 @@
"""Debate operations - Deterministic processing of debate artifacts."""

View File

@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""CLI entry point for debate operations.
This module makes the package executable via:
python3 /path/to/debate_ops/__main__.py <command>
The path setup below ensures absolute imports work when run as a script.
"""
import json
import sys
from pathlib import Path
# Add package parent directory to sys.path for absolute imports
# This allows: python3 .claude/skills/debate-orchestrator/debate_ops/__main__.py
_package_dir = Path(__file__).resolve().parent.parent
if str(_package_dir) not in sys.path:
sys.path.insert(0, str(_package_dir))
from debate_ops.debater import process_debater
from debate_ops.judge import process_judge
from debate_ops.state import update_debate_state
def main() -> None:
"""Main CLI entry point."""
if len(sys.argv) < 2:
print(
json.dumps(
{"success": False, "error": "Usage: python3 -m debate_ops <command> <args...>"}
),
file=sys.stderr,
)
sys.exit(1)
command = sys.argv[1]
try:
if command == "process-exchange":
# New command: processes both sides and updates state
# Usage: python3 -m debate_ops process-exchange <debate> <exchange> --prop-file <path> --opp-file <path>
if len(sys.argv) != 8:
print(
json.dumps(
{
"success": False,
"error": "Usage: process-exchange <debate> <exchange> --prop-file <path> --opp-file <path>",
}
),
file=sys.stderr,
)
sys.exit(1)
debate = sys.argv[2]
exchange = int(sys.argv[3])
# Extract file paths
try:
prop_file_idx = sys.argv.index("--prop-file")
prop_file_path = Path(sys.argv[prop_file_idx + 1])
opp_file_idx = sys.argv.index("--opp-file")
opp_file_path = Path(sys.argv[opp_file_idx + 1])
except (ValueError, IndexError):
print(
json.dumps({"success": False, "error": "Both --prop-file and --opp-file required"}),
file=sys.stderr,
)
sys.exit(1)
# Process proposition side
prop_output = prop_file_path.read_text()
result_prop = process_debater(
debate=debate,
side='proposition',
exchange=exchange,
output=prop_output,
)
if not result_prop.success:
prop_file_path.unlink(missing_ok=True)
opp_file_path.unlink(missing_ok=True)
print(json.dumps({
"success": False,
"side": "proposition",
"errors": result_prop.errors,
"warnings": result_prop.warnings,
}), file=sys.stderr)
sys.exit(1)
# Process opposition side
opp_output = opp_file_path.read_text()
result_opp = process_debater(
debate=debate,
side='opposition',
exchange=exchange,
output=opp_output,
)
if not result_opp.success:
prop_file_path.unlink(missing_ok=True)
opp_file_path.unlink(missing_ok=True)
print(json.dumps({
"success": False,
"side": "opposition",
"errors": result_opp.errors,
"warnings": result_opp.warnings,
}), file=sys.stderr)
sys.exit(1)
# Both sides processed successfully - update state
update_debate_state(debate, current_phase='awaiting_judgment')
# Clean up temp files
prop_file_path.unlink(missing_ok=True)
opp_file_path.unlink(missing_ok=True)
# Output combined result
output_dict = {
"success": True,
"argument_id": {
"proposition": result_prop.argument_id,
"opposition": result_opp.argument_id,
},
"warnings": (result_prop.warnings or []) + (result_opp.warnings or []) or None,
}
print(json.dumps(output_dict, indent=2))
sys.exit(0)
elif command == "process-judge":
# Usage: python3 -m debate_ops process-judge <debate> --json-file <path>
if len(sys.argv) != 5:
print(
json.dumps(
{
"success": False,
"error": "Usage: process-judge <debate> --json-file <path>",
}
),
file=sys.stderr,
)
sys.exit(1)
try:
json_file_idx = sys.argv.index("--json-file")
json_file_path = Path(sys.argv[json_file_idx + 1])
output = json_file_path.read_text()
except (ValueError, IndexError):
print(
json.dumps({"success": False, "error": "--json-file parameter required"}),
file=sys.stderr,
)
sys.exit(1)
result = process_judge(debate=sys.argv[2], output=output)
# Clean up temp file
json_file_path.unlink(missing_ok=True)
# Output result
output_dict = {
"success": result.success,
"argument_id": result.argument_id,
**(
{"score": result.score}
if hasattr(result, "score") and result.score
else {}
),
**(
{"rescored": result.rescored}
if hasattr(result, "rescored") and result.rescored
else {}
),
**({"errors": result.errors} if result.errors else {}),
**({"warnings": result.warnings} if result.warnings else {}),
}
print(json.dumps(output_dict, indent=2))
sys.exit(0 if result.success else 1)
else:
print(
json.dumps({"success": False, "error": f"Unknown command: {command}"}),
file=sys.stderr,
)
sys.exit(1)
except Exception as e:
print(json.dumps({"success": False, "error": str(e)}), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,198 @@
"""Process debater agent outputs."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from debate_ops import frontmatter
@dataclass
class ProcessResult:
success: bool
argument_id: str | list[str] | None = None
errors: list[str] | None = None
warnings: list[str] | None = None
REQUIRED_KEYS = {'title', 'claim', 'grounds', 'warrant'}
OPTIONAL_KEYS = {'backing', 'qualifier', 'attacks', 'defends'}
VALID_KEYS = REQUIRED_KEYS | OPTIONAL_KEYS
def process_debater(
debate: str,
side: Literal['proposition', 'opposition'],
exchange: int,
output: str | dict | list
) -> ProcessResult:
"""Process debater output, handling both single arguments and lists of arguments."""
# Parse input to get data structure
if isinstance(output, str):
cleaned = re.sub(r'^```(?:json|yaml)?\s*|\s*```$', '', output.strip(), flags=re.MULTILINE)
try:
parsed = json.loads(cleaned)
except json.JSONDecodeError as e:
return ProcessResult(success=False, errors=[f"Invalid JSON: {e}"])
else:
parsed = output
# Determine if single argument or list of arguments
if isinstance(parsed, list):
# Multiple arguments (e.g., opening statements)
if not parsed:
return ProcessResult(success=False, errors=["Empty argument list"])
all_warnings = []
arg_ids = []
for idx, arg_data in enumerate(parsed):
result = _process_single_argument(
debate=debate,
side=side,
exchange=exchange,
data=arg_data,
index=idx
)
if not result.success:
return result # Fail fast on any error
arg_ids.append(result.argument_id)
if result.warnings:
all_warnings.extend(result.warnings)
return ProcessResult(
success=True,
argument_id=arg_ids,
warnings=all_warnings or None
)
else:
# Single argument (standard case)
result = _process_single_argument(
debate=debate,
side=side,
exchange=exchange,
data=parsed,
index=None
)
if not result.success:
return result
return result
def _process_single_argument(
debate: str,
side: Literal['proposition', 'opposition'],
exchange: int,
data: dict,
index: int | None = None
) -> ProcessResult:
"""Process a single argument and create its file. Does not update debate state."""
warnings = []
# Validate required keys
if missing := REQUIRED_KEYS - set(data.keys()):
return ProcessResult(success=False, errors=[f"Missing required keys: {missing}"])
if extra := set(data.keys()) - VALID_KEYS:
warnings.append(f"Unrecognized keys (ignored): {extra}")
# Validate grounds
if not isinstance(data['grounds'], list) or not data['grounds']:
return ProcessResult(success=False, errors=["'grounds' must be non-empty list"])
if not (1 <= len(data['grounds']) <= 3):
return ProcessResult(success=False, errors=[f"'grounds' must contain 1-3 entries (found {len(data['grounds'])})"])
required_ground_keys = {'source', 'content', 'relevance'}
for idx, ground in enumerate(data['grounds']):
if missing_ground := required_ground_keys - set(ground.keys()):
return ProcessResult(success=False, errors=[f"Ground {idx}: missing keys {missing_ground}"])
# Validate attacks
if len(attacks_list := data.get('attacks', [])) > 3:
return ProcessResult(success=False, errors=[f"Too many attacks ({len(attacks_list)}). Maximum: 3"])
# Validate defends
if len(defends_list := data.get('defends', [])) > 2:
return ProcessResult(success=False, errors=[f"Too many defends ({len(defends_list)}). Maximum: 2"])
# Generate argument ID
side_abbr = 'prop' if side == 'proposition' else 'opp'
if index is not None:
# Multiple arguments: prop_000a, prop_000b, prop_000c, etc.
arg_id = f"{side_abbr}_{exchange:03d}{chr(ord('a') + index)}"
else:
# Single argument: prop_005
arg_id = f"{side_abbr}_{exchange:03d}"
# Create metadata
metadata = {
'id': arg_id,
'side': side_abbr,
'exchange': exchange,
'title': data['title'],
'claim': data['claim'],
'attacks': [
{'target_id': a['target_id'], 'type': a['attack_type']}
for a in attacks_list if a.get('target_id')
],
'defends': [
{'target_id': d['target_id'], 'type': d['defense_type']}
for d in data.get('defends', []) if d.get('target_id')
]
}
# Write argument file
arg_file = Path.cwd() / debate / 'arguments' / f'{arg_id}.md'
arg_file.parent.mkdir(parents=True, exist_ok=True)
frontmatter.dump(frontmatter.Document(metadata, _format_argument_markdown(data)), arg_file)
return ProcessResult(success=True, argument_id=arg_id, warnings=warnings or None)
def _format_argument_markdown(data: dict[str, Any]) -> str:
sections = [f"## Claim\n\n{data['claim']}", "## Grounds"]
# Updated to new ground structure
for idx, g in enumerate(data['grounds'], 1):
sections.extend([
f"### {idx}. {g['source']}",
f"> {g['content']}",
f"**Relevance:** {g['relevance']}"
])
sections.append(f"## Warrant\n\n{data['warrant']}")
if backing := data.get('backing'):
sections.append(f"## Backing\n\n{backing}")
if qualifier := data.get('qualifier'):
sections.append(f"## Qualifier\n\n{qualifier}")
if attacks := data.get('attacks'):
sections.append("## Attacks")
for a in attacks:
sections.extend([
f"### Attacking {a.get('target_id', 'unknown')}",
f"**Type:** {a.get('attack_type', 'unspecified')}",
a.get('content', '')
])
if defends := data.get('defends'):
sections.append("## Defends")
for d in defends:
sections.extend([
f"### Defending {d.get('target_id', 'unknown')}",
f"**Type:** {d.get('defense_type', 'unspecified')}",
d.get('content', '')
])
return '\n\n'.join(sections)

View File

@@ -0,0 +1,80 @@
"""JSON frontmatter parsing - zero dependencies."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class Document:
metadata: dict[str, Any]
content: str
def __getitem__(self, key: str) -> Any:
return self.metadata[key]
def get(self, key: str, default: Any = None) -> Any:
return self.metadata.get(key, default)
def _find_json_end(text: str) -> int:
depth = in_string = escape_next = 0
for i, char in enumerate(text):
if escape_next:
escape_next = 0
continue
if char == '\\':
escape_next = 1
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if in_string:
continue
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0:
return i + 1
return -1
def parse(text: str) -> Document:
text = text.lstrip()
if not text.startswith('{'):
raise ValueError("JSON frontmatter must start with '{'")
end_pos = _find_json_end(text)
if end_pos == -1:
raise ValueError("JSON frontmatter not properly closed")
try:
metadata = json.loads(text[:end_pos])
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in frontmatter: {e}")
if not isinstance(metadata, dict):
raise ValueError("JSON frontmatter must be object")
return Document(metadata=metadata, content=text[end_pos:].lstrip('\n'))
def dumps(metadata: dict[str, Any], content: str) -> str:
return f"{json.dumps(metadata, indent=2)}\n\n{content}"
def load(filepath: Path | str) -> Document:
return parse(Path(filepath).read_text())
def dump(doc: Document, filepath: Path | str) -> None:
Path(filepath).write_text(dumps(doc.metadata, doc.content))
def update_metadata(filepath: Path | str, **updates: Any) -> None:
doc = load(filepath)
doc.metadata.update(updates)
dump(doc, filepath)

View File

@@ -0,0 +1,227 @@
"""Process judge agent outputs."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from debate_ops import frontmatter
from debate_ops import mermaid
from debate_ops.state import update_debate_state, read_debate_state
@dataclass
class ProcessResult:
success: bool
argument_id: str | list[str] | None = None
score: float | list[float] | None = None
rescored: list[str] | None = None
errors: list[str] | None = None
warnings: list[str] | None = None
def _parse_judge_output(output: str | dict) -> dict | ProcessResult:
"""Parse judge output from string or dict. Returns parsed dict or ProcessResult on error."""
if isinstance(output, str):
cleaned = re.sub(r'^```(?:json|yaml)?\s*|\s*```$', '', output.strip(), flags=re.MULTILINE)
try:
return json.loads(cleaned)
except json.JSONDecodeError as e:
return ProcessResult(success=False, errors=[f"Invalid JSON: {e}"])
return output
def _normalize_scores(data: dict) -> list[dict] | ProcessResult:
"""Normalize single or multiple score formats to unified list structure.
Returns list of dicts with keys: argument_id, score, reasoning
Or ProcessResult on error.
"""
if 'scores' in data:
# Multiple arguments format
if not isinstance(data['scores'], list) or not data['scores']:
return ProcessResult(success=False, errors=["'scores' must be non-empty list"])
normalized = []
for entry in data['scores']:
if missing := {'argument_id', 'score', 'reasoning'} - set(entry.keys()):
return ProcessResult(success=False, errors=[f"Score entry missing keys: {missing}"])
if not (-1 <= entry['score'] <= 1):
return ProcessResult(success=False, errors=[f"Score {entry['score']} for {entry['argument_id']} outside valid range [-1, 1]"])
normalized.append(entry)
# Zero-sum validation
total = sum(entry['score'] for entry in normalized)
if abs(total) > 0.01: # Tolerance for floating point
return ProcessResult(success=False, errors=[f"Scores must sum to 0 (got {total:.3f})"])
return normalized
else:
# Single argument format
if missing := {'argument_id', 'score', 'reasoning'} - set(data.keys()):
return ProcessResult(success=False, errors=[f"Missing required keys: {missing}"])
if not (-1 <= data['score'] <= 1):
return ProcessResult(success=False, errors=[f"Score {data['score']} outside valid range [-1, 1]"])
return [{'argument_id': data['argument_id'], 'score': data['score'], 'reasoning': data['reasoning']}]
def process_judge(debate: str, output: str | dict) -> ProcessResult:
"""Process judge output and update debate state."""
warnings = []
# Parse input
data = _parse_judge_output(output)
if isinstance(data, ProcessResult): # Error case
return data
# Normalize to unified structure
scores_normalized = _normalize_scores(data)
if isinstance(scores_normalized, ProcessResult): # Error case
return scores_normalized
# Record all primary scores
debate_dir = Path.cwd() / debate
scores_file = debate_dir / 'scores.json'
arg_ids, score_values = [], []
for entry in scores_normalized:
_record_score(scores_file, entry['argument_id'], entry['score'], entry['reasoning'], triggered_by=None)
arg_ids.append(entry['argument_id'])
score_values.append(entry['score'])
# Process rescores, update state, generate artifacts (unified flow)
rescored = _process_rescores(scores_file, data.get('rescores', []), warnings, triggered_by_list=arg_ids)
_update_cumulative_scores(debate, scores_file)
mermaid.generate_graph(debate)
_update_state_after_judgment(debate)
# Return result (preserve single vs multiple structure for backward compatibility)
return ProcessResult(
success=True,
argument_id=arg_ids if len(arg_ids) > 1 else arg_ids[0],
score=score_values if len(score_values) > 1 else score_values[0],
rescored=rescored or None,
warnings=warnings or None
)
def _process_rescores(
scores_file: Path,
rescores: list,
warnings: list,
triggered_by_list: list[str]
) -> list[str]:
"""Process rescores and return list of rescored argument IDs."""
rescored = []
for rescore in rescores:
if not (rescore_id := rescore.get('argument_id')) or (new_score := rescore.get('new_score')) is None:
warnings.append(f"Incomplete rescore entry: {rescore}")
continue
old_score = rescore.get('old_score')
rescore_reasoning = rescore.get('reasoning', '')
# Validate rescore is an adjustment (delta), not absolute score
if old_score is not None:
delta = new_score - old_score
if not (-0.5 <= delta <= 0.5):
warnings.append(f"Rescore delta for {rescore_id} is {delta:.3f}, outside valid range [-0.5, 0.5]")
continue
# For rescores triggered by multiple arguments, use first one
triggered_by = triggered_by_list[0] if triggered_by_list else None
_record_score(
scores_file, rescore_id, new_score, rescore_reasoning,
triggered_by=triggered_by, previous_score=old_score
)
rescored.append(rescore_id)
return rescored
def _update_state_after_judgment(debate: str) -> None:
"""Update debate state after judgment completes."""
state = read_debate_state(debate)
update_debate_state(
debate,
current_phase='awaiting_arguments',
current_exchange=state['current_exchange'] + 1
)
def _record_score(
file: Path,
arg_id: str,
score: float,
reasoning: str,
triggered_by: str | None = None,
previous_score: float | None = None
) -> None:
"""Record a score or rescore in the argument-centric structure."""
# Load existing data or initialize
if file.exists():
with open(file) as f:
data = json.load(f)
else:
data = {}
# Ensure argument entry exists
if arg_id not in data:
data[arg_id] = {
'current_score': score,
'history': []
}
# Build history entry
entry = {
'score': score,
'reasoning': reasoning,
'scored_at': datetime.now(timezone.utc).isoformat()
}
# If this is a rescore (has triggered_by), add rescore fields
if triggered_by:
entry['triggered_by'] = triggered_by
if previous_score is not None:
entry['previous_score'] = previous_score
entry['diff'] = round(score - previous_score, 3)
# Append to history and update current score
data[arg_id]['history'].append(entry)
data[arg_id]['current_score'] = score
# Save
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _update_cumulative_scores(debate: str, scores_file: Path) -> None:
"""Update cumulative scores in debate.md frontmatter (zero-sum tug-of-war)."""
if not scores_file.exists():
return
with open(scores_file) as f:
data = json.load(f)
# Extract current scores
prop_scores = [arg_data['current_score'] for arg_id, arg_data in data.items() if arg_id.startswith('prop_')]
opp_scores = [arg_data['current_score'] for arg_id, arg_data in data.items() if arg_id.startswith('opp_')]
# Zero-sum tug-of-war: sum all scores for each side
prop_total = round(sum(prop_scores), 3) if prop_scores else 0
opp_total = round(sum(opp_scores), 3) if opp_scores else 0
doc = frontmatter.load(Path.cwd() / debate / 'debate.md')
doc.metadata['cumulative_scores'] = {
'proposition': {'total': prop_total, 'count': len(prop_scores)},
'opposition': {'total': opp_total, 'count': len(opp_scores)}
}
frontmatter.dump(doc, Path.cwd() / debate / 'debate.md')

View File

@@ -0,0 +1,123 @@
"""Generate mermaid argument graph from debate state."""
from __future__ import annotations
import json
from pathlib import Path
from debate_ops import frontmatter
def generate_graph(debate: str) -> None:
"""Generate mermaid flowchart showing argument relationships and scores.
Reads argument structure from frontmatter, scores from scores.json.
Updates or creates {debate}/argument-graph.mmd.
"""
debate_dir = Path.cwd() / debate
args_dir = debate_dir / 'arguments'
if not args_dir.exists():
return
# Load scores from scores.json
scores_file = debate_dir / 'scores.json'
scores_data = json.load(open(scores_file)) if scores_file.exists() else {}
# Collect argument data
arguments = []
for arg_file in sorted(args_dir.glob('*.md')):
doc = frontmatter.load(arg_file)
meta = doc.metadata
arg_id = meta.get('id', arg_file.stem)
# Get score from scores.json instead of frontmatter
score = scores_data.get(arg_id, {}).get('current_score', None)
# Get attacks and defends (expect dict format with target_id and type)
attacks = meta.get('attacks', [])
defends = meta.get('defends', [])
# Use title if available, otherwise fallback to truncated claim
display_text = meta.get('title', meta.get('claim', 'No claim')[:50] + ('...' if len(meta.get('claim', '')) > 50 else ''))
arguments.append({
'id': arg_id,
'side': meta.get('side', 'unknown'),
'display': display_text,
'score': score,
'attacks': attacks,
'defends': defends
})
if not arguments:
return
# Build mermaid syntax with ELK layout for better visualization
lines = [
'---',
'config:',
' layout: elk',
' elk:',
' nodePlacementStrategy: NETWORK_SIMPLEX',
'---',
'graph TD',
''
]
# Nodes - dark fills with white text for GitHub theme compatibility
for arg in arguments:
score = arg['score'] if arg['score'] is not None else 0
score_display = f"{score:.2f}" if score is not None else ""
# Proposition: dark green, Opposition: dark red
fill, stroke, border_width = (
('#1B5E20', '#4CAF50', '3px') if arg['side'] == 'prop' and score >= 0.75
else ('#1B5E20', '#4CAF50', '2px') if arg['side'] == 'prop'
else ('#B71C1C', '#F44336', '3px') if score >= 0.75
else ('#B71C1C', '#F44336', '2px')
)
lines.extend([
f' {arg["id"]}["{arg["id"]}<br/>{arg["display"]}<br/>⭐ {score_display}"]',
f' style {arg["id"]} fill:{fill},stroke:{stroke},stroke-width:{border_width},color:#FFFFFF'
])
lines.append('')
# Edges - track index for linkStyle coloring
edge_index = 0
link_styles = []
for arg in arguments:
# Attacks: solid lines, orange color
for attack in arg['attacks']:
target_id = attack['target_id']
attack_type = attack['type'].replace('_attack', '') if '_attack' in attack['type'] else attack['type']
lines.append(f' {arg["id"]} -->|⚔️ {attack_type}| {target_id}')
link_styles.append(f' linkStyle {edge_index} stroke:#ff9800,stroke-width:2px')
edge_index += 1
# Defends: blue color, style varies by type
for defend in arg['defends']:
target_id = defend['target_id']
defense_type = defend['type']
if defense_type == 'concede_and_pivot':
# Concede and pivot: dotted line (retreat/weakness)
emoji = '↩️'
lines.append(f' {arg["id"]} -.->|{emoji} {defense_type}| {target_id}')
else:
# Reinforce/clarify: solid line (strengthening)
emoji = '🛡️'
lines.append(f' {arg["id"]} -->|{emoji} {defense_type}| {target_id}')
link_styles.append(f' linkStyle {edge_index} stroke:#2196F3,stroke-width:2px')
edge_index += 1
# Add link styles at the end
if link_styles:
lines.append('')
lines.extend(link_styles)
# Write to file
output_file = debate_dir / 'argument-graph.mmd'
output_file.write_text('\n'.join(lines) + '\n')

View File

@@ -0,0 +1,47 @@
"""Debate state management."""
from __future__ import annotations
from pathlib import Path
from typing import Literal, TypedDict
from debate_ops import frontmatter
Phase = Literal['awaiting_arguments', 'awaiting_judgment']
class DebateState(TypedDict):
"""Debate state from frontmatter."""
debate_id: str
current_exchange: int
current_phase: Phase
def read_debate_state(debate: str) -> DebateState:
"""Read current debate state from debate.md frontmatter."""
debate_file = Path.cwd() / debate / 'debate.md'
doc = frontmatter.load(debate_file)
return DebateState(
debate_id=doc['debate_id'],
current_exchange=doc['current_exchange'],
current_phase=doc['current_phase'] # type: ignore
)
def update_debate_state(
debate: str,
current_exchange: int | None = None,
current_phase: Phase | None = None
) -> None:
"""Update debate.md frontmatter with new state values."""
debate_file = Path.cwd() / debate / 'debate.md'
doc = frontmatter.load(debate_file)
if current_exchange is not None:
doc.metadata['current_exchange'] = current_exchange
if current_phase is not None:
doc.metadata['current_phase'] = current_phase
frontmatter.dump(doc, debate_file)

View File

@@ -0,0 +1,6 @@
Mode: Opening Exchange
Motion: {motion}
Side: {side}
Exchange: 0
This is the opening exchange. Construct three independent arguments establishing your position from distinct angles.

View File

@@ -0,0 +1,12 @@
Mode: Rebuttal Exchange
Motion: {motion}
Side: {side}
Exchange: {exchange}
Your previous arguments:
{your_arguments}
Opponent's arguments:
{opponent_arguments}
Construct one argument advancing your position.

View File

@@ -0,0 +1,5 @@
Evaluate these arguments: {argument_files}
Motion: {motion}
**Zero-sum constraint**: Your scores must sum to exactly 0.