Files
2025-11-29 18:24:34 +08:00

374 lines
10 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 记忆系统使用示例
## 快速开始
### 环境准备
```bash
# 进入项目目录
cd /path/to/ai-runtime
# 进入记忆系统目录
cd .ai-runtime/memory
```
### 第一个查询
```bash
# 查看所有记忆事件
python3 memory_cli.py query
# 查看今天的事件
python3 memory_cli.py query --where "date='$(date +%Y-%m-%d)'"
```
## 情景记忆查询示例
### 时间范围查询
```bash
# 查看本周事件
python3 memory_cli.py query \
--where "date>='$(date -d 'last monday' +%Y-%m-%d)'" \
--order-by "timestamp desc"
# 查看最近24小时的事件
python3 memory_cli.py query \
--where "timestamp >= '$(date -d '24 hours ago' +%Y-%m-%dT%H:%M:%S)'" \
--order-by "timestamp desc"
```
### 类型过滤
```bash
# 查看所有决策
python3 memory_cli.py query \
--where "type='decision'" \
--order-by "timestamp desc"
# 查看错误事件
python3 memory_cli.py query \
--where "type='error'" \
--select "timestamp,title,tags" \
--limit 10
```
### 标签查询
```bash
# 查找架构相关事件
python3 memory_cli.py query \
--where "tags CONTAINS 'architecture'"
# 查找安全相关决策
python3 memory_cli.py query \
--where "type='decision' AND tags CONTAINS 'security'"
```
### 复杂组合查询
```bash
# 查找本月的重要架构决策
python3 memory_cli.py query \
--where "date>='$(date +%Y-%m-01)' AND type='decision' AND tags CONTAINS 'architecture'" \
--order-by "timestamp desc" \
--limit 20
```
## 数据导出和分析
### JSON格式导出
```bash
# 导出本周事件为JSON
python3 memory_cli.py query \
--where "date>='$(date -d '7 days ago' +%Y-%m-%d)'" \
--format json \
--order-by "timestamp desc" > weekly-events.json
```
### 统计分析
```bash
# 统计各类型事件数量
python3 memory_cli.py query \
--select "type" \
--format json | jq -r '.[].type' | sort | uniq -c | sort -nr
# 统计热门标签
python3 memory_cli.py query \
--select "tags" \
--format json | jq -r '.[].tags[]' | sort | uniq -c | sort -nr | head -10
```
### 时间线分析
```bash
# 生成每日事件数量统计
python3 memory_cli.py query \
--select "date" \
--format json | jq -r '.[].date' | sort | uniq -c | sort
```
## 编程接口使用
### Python集成示例
```python
#!/usr/bin/env python3
"""记忆系统集成示例"""
import sys
from pathlib import Path
from memory_discovery import MemoryDiscovery
class MemoryAnalytics:
"""记忆分析工具"""
def __init__(self, memory_root: str):
self.discovery = MemoryDiscovery(memory_root)
def get_recent_events(self, days: int = 7):
"""获取最近N天的所有事件"""
import datetime
cutoff = datetime.datetime.now() - datetime.timedelta(days=days)
return self.discovery.query(
where=f"timestamp >= '{cutoff.isoformat()}'",
order_by="timestamp desc"
)
def get_events_by_type(self, event_type: str, limit: int = 50):
"""按类型获取事件"""
return self.discovery.query(
where=f"type='{event_type}'",
order_by="timestamp desc",
limit=limit
)
def search_by_tags(self, tags: list, match_all: bool = True):
"""按标签搜索"""
if not tags:
return []
conditions = []
for tag in tags:
conditions.append(f"tags CONTAINS '{tag}'")
operator = " AND " if match_all else " OR "
where_clause = operator.join(conditions)
return self.discovery.query(where=where_clause)
def get_event_summary(self, days: int = 30):
"""生成事件摘要统计"""
events = self.get_recent_events(days)
summary = {
'total': len(events),
'by_type': {},
'by_tag': {},
'timeline': {}
}
for event in events:
# 按类型统计
summary['by_type'][event.type] = summary['by_type'].get(event.type, 0) + 1
# 按标签统计
for tag in event.tags:
summary['by_tag'][tag] = summary['by_tag'].get(tag, 0) + 1
# 按日期统计
date_str = event.date
summary['timeline'][date_str] = summary['timeline'].get(date_str, 0) + 1
return summary
def main():
analytics = MemoryAnalytics('.ai-runtime/memory')
# 获取最近7天的事件摘要
summary = analytics.get_event_summary(7)
print(f"最近7天共记录 {summary['total']} 个事件")
# 显示类型分布
print("\n事件类型分布:")
for event_type, count in summary['by_type'].items():
print(f" {event_type}: {count}")
# 显示热门标签
print("\n热门标签:")
sorted_tags = sorted(summary['by_tag'].items(), key=lambda x: x[1], reverse=True)
for tag, count in sorted_tags[:10]:
print(f" {tag}: {count}")
if __name__ == "__main__":
main()
```
### Web界面集成
```python
#!/usr/bin/env python3
"""简单的记忆查询Web服务"""
from flask import Flask, request, jsonify
from memory_discovery import MemoryDiscovery
app = Flask(__name__)
discovery = MemoryDiscovery('.ai-runtime/memory')
@app.route('/api/events', methods=['GET'])
def get_events():
"""查询事件API"""
where = request.args.get('where', '')
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
format_type = request.args.get('format', 'json')
events = discovery.query(where=where, limit=limit, offset=offset)
return discovery.format_events(events, format_type=format_type)
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""获取统计信息"""
days = int(request.args.get('days', 30))
import datetime
cutoff = datetime.datetime.now() - datetime.timedelta(days=days)
events = discovery.query(where=f"timestamp >= '{cutoff.isoformat()}'")
stats = {
'total': len(events),
'by_type': {},
'by_tag': {},
'date_range': {
'start': cutoff.date().isoformat(),
'end': datetime.date.today().isoformat()
}
}
for event in events:
stats['by_type'][event.type] = stats['by_type'].get(event.type, 0) + 1
for tag in event.tags:
stats['by_tag'][tag] = stats['by_tag'].get(tag, 0) + 1
return jsonify(stats)
if __name__ == "__main__":
app.run(debug=True, port=5001)
```
## 自动化脚本
### 每日摘要生成
```bash
#!/bin/bash
# 生成每日记忆摘要
DATE=$(date +%Y-%m-%d)
OUTPUT_DIR=".ai-runtime/reports"
mkdir -p $OUTPUT_DIR
echo "# ${DATE} 记忆摘要" > $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 今日事件统计" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --format json | jq '. | length' >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 今日事件列表" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --select "timestamp,title,type" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 热门标签" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --select "tags" --format json | jq -r '.[].tags[]' | sort | uniq -c | sort -nr | head -5 >> $OUTPUT_DIR/daily-summary-${DATE}.md
```
### 定期维护脚本
```bash
#!/bin/bash
# 记忆系统定期维护
MEMORY_ROOT=".ai-runtime/memory"
# 清理过期短期记忆30天
find ${MEMORY_ROOT}/short-term/ -mtime +30 -delete
# 检查episodic文件一致性
python3 -c "
from memory_discovery import MemoryDiscovery
d = MemoryDiscovery('${MEMORY_ROOT}')
invalid = [e for e in d.events if not e.timestamp or not e.title]
if invalid:
print('发现无效事件:')
for e in invalid:
print(f' {e.path}: 缺少必要字段')
else:
print('所有事件文件有效')
"
# 生成维护报告
REPORT_FILE=".ai-runtime/reports/maintenance-$(date +%Y%m%d).md"
echo "# 记忆系统维护报告 - $(date)" > $REPORT_FILE
echo "" >> $REPORT_FILE
echo "## 统计信息" >> $REPORT_FILE
echo "- Episodic事件数量: $(find ${MEMORY_ROOT}/episodic/ -name '*.md' | wc -l)" >> $REPORT_FILE
echo "- Long-term文档数量: $(find ${MEMORY_ROOT}/long-term/ -name '*.md' | wc -l)" >> $REPORT_FILE
echo "- Short-term文件数量: $(find ${MEMORY_ROOT}/short-term/ -name '*.md' | wc -l)" >> $REPORT_FILE
```
## 高级查询技巧
### 正则表达式搜索
```python
# 使用Python进行高级搜索
import re
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
# 搜索包含特定关键词的事件
keyword_events = []
for event in discovery.events:
if re.search(r'OAuth|认证', event.title, re.IGNORECASE):
keyword_events.append(event)
print(f"找到 {len(keyword_events)} 个相关事件")
```
### 时间序列分析
```python
# 分析事件的时间分布
from collections import defaultdict
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
events = discovery.query(order_by="timestamp asc")
# 按小时统计
hourly_stats = defaultdict(int)
for event in events:
hour = event.timestamp.hour
hourly_stats[hour] += 1
print("事件按小时分布:")
for hour in sorted(hourly_stats.keys()):
print(f" {hour:02d}:00: {hourly_stats[hour]} 个事件")
```
### 相关性分析
```python
# 分析标签共现关系
from collections import defaultdict
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
# 构建标签共现矩阵
cooccurrence = defaultdict(lambda: defaultdict(int))
for event in discovery.events:
tags = event.tags
for i, tag1 in enumerate(tags):
for tag2 in tags[i+1:]:
cooccurrence[tag1][tag2] += 1
cooccurrence[tag2][tag1] += 1
# 显示最相关的标签对
print("标签共现分析:")
for tag1 in sorted(cooccurrence.keys()):
for tag2, count in sorted(cooccurrence[tag1].items(), key=lambda x: x[1], reverse=True):
if count > 1: # 至少出现2次
print(f" {tag1} + {tag2}: {count}")
```