10 KiB
10 KiB
记忆系统使用示例
快速开始
环境准备
# 进入项目目录
cd /path/to/ai-runtime
# 进入记忆系统目录
cd .ai-runtime/memory
第一个查询
# 查看所有记忆事件
python3 memory_cli.py query
# 查看今天的事件
python3 memory_cli.py query --where "date='$(date +%Y-%m-%d)'"
情景记忆查询示例
时间范围查询
# 查看本周事件
python3 memory_cli.py query \
--where "date>='$(date -d 'last monday' +%Y-%m-%d)'" \
--order-by "timestamp desc"
# 查看最近24小时的事件
python3 memory_cli.py query \
--where "timestamp >= '$(date -d '24 hours ago' +%Y-%m-%dT%H:%M:%S)'" \
--order-by "timestamp desc"
类型过滤
# 查看所有决策
python3 memory_cli.py query \
--where "type='decision'" \
--order-by "timestamp desc"
# 查看错误事件
python3 memory_cli.py query \
--where "type='error'" \
--select "timestamp,title,tags" \
--limit 10
标签查询
# 查找架构相关事件
python3 memory_cli.py query \
--where "tags CONTAINS 'architecture'"
# 查找安全相关决策
python3 memory_cli.py query \
--where "type='decision' AND tags CONTAINS 'security'"
复杂组合查询
# 查找本月的重要架构决策
python3 memory_cli.py query \
--where "date>='$(date +%Y-%m-01)' AND type='decision' AND tags CONTAINS 'architecture'" \
--order-by "timestamp desc" \
--limit 20
数据导出和分析
JSON格式导出
# 导出本周事件为JSON
python3 memory_cli.py query \
--where "date>='$(date -d '7 days ago' +%Y-%m-%d)'" \
--format json \
--order-by "timestamp desc" > weekly-events.json
统计分析
# 统计各类型事件数量
python3 memory_cli.py query \
--select "type" \
--format json | jq -r '.[].type' | sort | uniq -c | sort -nr
# 统计热门标签
python3 memory_cli.py query \
--select "tags" \
--format json | jq -r '.[].tags[]' | sort | uniq -c | sort -nr | head -10
时间线分析
# 生成每日事件数量统计
python3 memory_cli.py query \
--select "date" \
--format json | jq -r '.[].date' | sort | uniq -c | sort
编程接口使用
Python集成示例
#!/usr/bin/env python3
"""记忆系统集成示例"""
import sys
from pathlib import Path
from memory_discovery import MemoryDiscovery
class MemoryAnalytics:
"""记忆分析工具"""
def __init__(self, memory_root: str):
self.discovery = MemoryDiscovery(memory_root)
def get_recent_events(self, days: int = 7):
"""获取最近N天的所有事件"""
import datetime
cutoff = datetime.datetime.now() - datetime.timedelta(days=days)
return self.discovery.query(
where=f"timestamp >= '{cutoff.isoformat()}'",
order_by="timestamp desc"
)
def get_events_by_type(self, event_type: str, limit: int = 50):
"""按类型获取事件"""
return self.discovery.query(
where=f"type='{event_type}'",
order_by="timestamp desc",
limit=limit
)
def search_by_tags(self, tags: list, match_all: bool = True):
"""按标签搜索"""
if not tags:
return []
conditions = []
for tag in tags:
conditions.append(f"tags CONTAINS '{tag}'")
operator = " AND " if match_all else " OR "
where_clause = operator.join(conditions)
return self.discovery.query(where=where_clause)
def get_event_summary(self, days: int = 30):
"""生成事件摘要统计"""
events = self.get_recent_events(days)
summary = {
'total': len(events),
'by_type': {},
'by_tag': {},
'timeline': {}
}
for event in events:
# 按类型统计
summary['by_type'][event.type] = summary['by_type'].get(event.type, 0) + 1
# 按标签统计
for tag in event.tags:
summary['by_tag'][tag] = summary['by_tag'].get(tag, 0) + 1
# 按日期统计
date_str = event.date
summary['timeline'][date_str] = summary['timeline'].get(date_str, 0) + 1
return summary
def main():
analytics = MemoryAnalytics('.ai-runtime/memory')
# 获取最近7天的事件摘要
summary = analytics.get_event_summary(7)
print(f"最近7天共记录 {summary['total']} 个事件")
# 显示类型分布
print("\n事件类型分布:")
for event_type, count in summary['by_type'].items():
print(f" {event_type}: {count}")
# 显示热门标签
print("\n热门标签:")
sorted_tags = sorted(summary['by_tag'].items(), key=lambda x: x[1], reverse=True)
for tag, count in sorted_tags[:10]:
print(f" {tag}: {count}")
if __name__ == "__main__":
main()
Web界面集成
#!/usr/bin/env python3
"""简单的记忆查询Web服务"""
from flask import Flask, request, jsonify
from memory_discovery import MemoryDiscovery
app = Flask(__name__)
discovery = MemoryDiscovery('.ai-runtime/memory')
@app.route('/api/events', methods=['GET'])
def get_events():
"""查询事件API"""
where = request.args.get('where', '')
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
format_type = request.args.get('format', 'json')
events = discovery.query(where=where, limit=limit, offset=offset)
return discovery.format_events(events, format_type=format_type)
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""获取统计信息"""
days = int(request.args.get('days', 30))
import datetime
cutoff = datetime.datetime.now() - datetime.timedelta(days=days)
events = discovery.query(where=f"timestamp >= '{cutoff.isoformat()}'")
stats = {
'total': len(events),
'by_type': {},
'by_tag': {},
'date_range': {
'start': cutoff.date().isoformat(),
'end': datetime.date.today().isoformat()
}
}
for event in events:
stats['by_type'][event.type] = stats['by_type'].get(event.type, 0) + 1
for tag in event.tags:
stats['by_tag'][tag] = stats['by_tag'].get(tag, 0) + 1
return jsonify(stats)
if __name__ == "__main__":
app.run(debug=True, port=5001)
自动化脚本
每日摘要生成
#!/bin/bash
# 生成每日记忆摘要
DATE=$(date +%Y-%m-%d)
OUTPUT_DIR=".ai-runtime/reports"
mkdir -p $OUTPUT_DIR
echo "# ${DATE} 记忆摘要" > $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 今日事件统计" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --format json | jq '. | length' >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 今日事件列表" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --select "timestamp,title,type" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "" >> $OUTPUT_DIR/daily-summary-${DATE}.md
echo "## 热门标签" >> $OUTPUT_DIR/daily-summary-${DATE}.md
python3 memory_cli.py query --where "date='${DATE}'" --select "tags" --format json | jq -r '.[].tags[]' | sort | uniq -c | sort -nr | head -5 >> $OUTPUT_DIR/daily-summary-${DATE}.md
定期维护脚本
#!/bin/bash
# 记忆系统定期维护
MEMORY_ROOT=".ai-runtime/memory"
# 清理过期短期记忆(30天)
find ${MEMORY_ROOT}/short-term/ -mtime +30 -delete
# 检查episodic文件一致性
python3 -c "
from memory_discovery import MemoryDiscovery
d = MemoryDiscovery('${MEMORY_ROOT}')
invalid = [e for e in d.events if not e.timestamp or not e.title]
if invalid:
print('发现无效事件:')
for e in invalid:
print(f' {e.path}: 缺少必要字段')
else:
print('所有事件文件有效')
"
# 生成维护报告
REPORT_FILE=".ai-runtime/reports/maintenance-$(date +%Y%m%d).md"
echo "# 记忆系统维护报告 - $(date)" > $REPORT_FILE
echo "" >> $REPORT_FILE
echo "## 统计信息" >> $REPORT_FILE
echo "- Episodic事件数量: $(find ${MEMORY_ROOT}/episodic/ -name '*.md' | wc -l)" >> $REPORT_FILE
echo "- Long-term文档数量: $(find ${MEMORY_ROOT}/long-term/ -name '*.md' | wc -l)" >> $REPORT_FILE
echo "- Short-term文件数量: $(find ${MEMORY_ROOT}/short-term/ -name '*.md' | wc -l)" >> $REPORT_FILE
高级查询技巧
正则表达式搜索
# 使用Python进行高级搜索
import re
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
# 搜索包含特定关键词的事件
keyword_events = []
for event in discovery.events:
if re.search(r'OAuth|认证', event.title, re.IGNORECASE):
keyword_events.append(event)
print(f"找到 {len(keyword_events)} 个相关事件")
时间序列分析
# 分析事件的时间分布
from collections import defaultdict
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
events = discovery.query(order_by="timestamp asc")
# 按小时统计
hourly_stats = defaultdict(int)
for event in events:
hour = event.timestamp.hour
hourly_stats[hour] += 1
print("事件按小时分布:")
for hour in sorted(hourly_stats.keys()):
print(f" {hour:02d}:00: {hourly_stats[hour]} 个事件")
相关性分析
# 分析标签共现关系
from collections import defaultdict
from memory_discovery import MemoryDiscovery
discovery = MemoryDiscovery('.ai-runtime/memory')
# 构建标签共现矩阵
cooccurrence = defaultdict(lambda: defaultdict(int))
for event in discovery.events:
tags = event.tags
for i, tag1 in enumerate(tags):
for tag2 in tags[i+1:]:
cooccurrence[tag1][tag2] += 1
cooccurrence[tag2][tag1] += 1
# 显示最相关的标签对
print("标签共现分析:")
for tag1 in sorted(cooccurrence.keys()):
for tag2, count in sorted(cooccurrence[tag1].items(), key=lambda x: x[1], reverse=True):
if count > 1: # 至少出现2次
print(f" {tag1} + {tag2}: {count} 次")