160 lines
5.8 KiB
Python
Executable File
160 lines
5.8 KiB
Python
Executable File
#!/usr/bin/env -S uv run --python 3.10 --script
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# ///
|
|
"""
|
|
NDP Plugin Event Logger
|
|
Logs Claude Code events related to NDP plugin usage to a local file.
|
|
Enhanced to capture tool names, user input, and agent responses.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import os
|
|
import argparse
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
def get_log_file_path():
|
|
"""Get the log file path within plugin directory"""
|
|
# Get plugin root directory
|
|
plugin_root = Path(__file__).parent.parent
|
|
logs_dir = plugin_root / "logs"
|
|
|
|
# Create logs directory if it doesn't exist
|
|
logs_dir.mkdir(exist_ok=True)
|
|
|
|
return logs_dir / "ndp_events.log"
|
|
|
|
def extract_enhanced_data(event_type: str, event_data: dict) -> dict:
|
|
"""Extract enhanced information from event data"""
|
|
enhanced = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"event_type": event_type,
|
|
"session_id": event_data.get("session_id", "unknown"),
|
|
}
|
|
|
|
# Extract tool information for PreToolUse and PostToolUse
|
|
if event_type in ["PreToolUse", "PostToolUse"]:
|
|
tool_data = event_data.get('tool', {})
|
|
if tool_data:
|
|
enhanced['tool_name'] = tool_data.get('name', 'unknown')
|
|
enhanced['tool_input'] = tool_data.get('input', {})
|
|
|
|
# For PostToolUse, capture tool results
|
|
if event_type == "PostToolUse":
|
|
if 'result' in event_data:
|
|
enhanced['tool_result'] = event_data['result']
|
|
if 'output' in event_data:
|
|
enhanced['tool_output'] = event_data['output']
|
|
if 'error' in event_data:
|
|
enhanced['tool_error'] = event_data['error']
|
|
|
|
# Extract user input for UserPromptSubmit
|
|
if event_type == "UserPromptSubmit":
|
|
if 'text' in event_data:
|
|
enhanced['user_prompt'] = event_data['text']
|
|
if 'messages' in event_data:
|
|
enhanced['conversation_messages'] = event_data['messages']
|
|
|
|
# For PostToolUse, extract agent response from transcript
|
|
if event_type == "PostToolUse" and 'transcript_path' in event_data:
|
|
transcript_path = event_data['transcript_path']
|
|
if os.path.exists(transcript_path):
|
|
try:
|
|
# Read last few messages to capture recent agent response
|
|
recent_chat = []
|
|
with open(transcript_path, 'r') as f:
|
|
lines = f.readlines()
|
|
# Get last 5 messages to capture context
|
|
for line in lines[-5:]:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
msg = json.loads(line)
|
|
recent_chat.append(msg)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
enhanced['recent_chat'] = recent_chat
|
|
|
|
# Extract the latest agent response
|
|
for msg in reversed(recent_chat):
|
|
if msg.get('role') == 'assistant':
|
|
enhanced['latest_agent_response'] = msg.get('content', [])
|
|
break
|
|
|
|
except Exception as e:
|
|
enhanced['transcript_read_error'] = str(e)
|
|
|
|
# For Stop event, optionally include full chat if requested
|
|
if event_type == "Stop" and 'transcript_path' in event_data:
|
|
transcript_path = event_data['transcript_path']
|
|
if os.path.exists(transcript_path):
|
|
try:
|
|
chat_data = []
|
|
with open(transcript_path, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
chat_data.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Add summary statistics
|
|
enhanced['chat_summary'] = {
|
|
'total_messages': len(chat_data),
|
|
'user_messages': sum(1 for msg in chat_data if msg.get('role') == 'user'),
|
|
'assistant_messages': sum(1 for msg in chat_data if msg.get('role') == 'assistant'),
|
|
}
|
|
# Optionally include last few messages
|
|
enhanced['last_5_messages'] = chat_data[-5:] if chat_data else []
|
|
|
|
except Exception as e:
|
|
enhanced['chat_read_error'] = str(e)
|
|
|
|
# Include raw event data for completeness
|
|
enhanced['raw_data'] = event_data
|
|
|
|
return enhanced
|
|
|
|
def log_event(event_type: str, event_data: dict):
|
|
"""Log event to file with enhanced data extraction"""
|
|
try:
|
|
log_file = get_log_file_path()
|
|
|
|
# Prepare enhanced log entry
|
|
log_entry = extract_enhanced_data(event_type, event_data)
|
|
|
|
# Append to log file (one JSON object per line)
|
|
with open(log_file, "a") as f:
|
|
f.write(json.dumps(log_entry) + "\n")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
# Fail silently to not block Claude Code
|
|
print(f"Warning: Failed to log event: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Log NDP plugin events with enhanced data capture')
|
|
parser.add_argument('--event-type', required=True, help='Type of event')
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# Read event data from stdin
|
|
event_data = json.load(sys.stdin)
|
|
except json.JSONDecodeError:
|
|
event_data = {}
|
|
|
|
# Log the event with enhanced data
|
|
log_event(args.event_type, event_data)
|
|
|
|
# Always exit successfully to not block Claude Code
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|