#!/usr/bin/env python3
|
"""
|
每日记忆检查脚本 V2 - 增强跨 session 消息聚合能力
|
优化点:
|
1. 跨 session 消息聚合 - 合并所有 session 的消息按时间排序
|
2. 增强事件检测 - 支持更多关键词和模式匹配
|
3. 处理 session 重置 - 正确识别 .reset. 和 .deleted. 文件
|
4. 完整时间线生成 - 按时间顺序展示今日所有活动
|
5. 智能消息过滤 - 区分真实用户消息和系统提示
|
"""
|
|
import os
|
import sys
|
import json
|
import re
|
from datetime import datetime, timedelta
|
from pathlib import Path
|
from typing import List, Dict, Optional, Tuple
|
from collections import defaultdict
|
|
|
def get_workspace_path() -> Path:
|
"""获取 workspace 路径。"""
|
return Path.home() / ".openclaw" / "workspace"
|
|
|
def get_sessions_path() -> Path:
|
"""获取 sessions 路径。"""
|
return Path.home() / ".openclaw" / "agents" / "main" / "sessions"
|
|
|
def check_today_journal() -> bool:
|
"""检查今日是否已有 L2 记录。"""
|
workspace = get_workspace_path()
|
today = datetime.now().strftime("%Y-%m-%d")
|
journal_file = workspace / "memory" / "journal" / f"{today}.md"
|
return journal_file.exists()
|
|
|
def get_l0_size() -> int:
|
"""获取 MEMORY.md 文件大小(字节)。"""
|
workspace = get_workspace_path()
|
memory_file = workspace / "MEMORY.md"
|
if memory_file.exists():
|
return memory_file.stat().st_size
|
return 0
|
|
|
def format_size(size_bytes: int) -> str:
|
"""格式化文件大小显示。"""
|
kb = size_bytes / 1024
|
return f"{kb:.1f}KB"
|
|
|
def get_today_session_files() -> List[Dict]:
|
"""
|
获取今日所有 session 文件(包括 .reset. 和 .deleted. 归档)
|
按修改时间排序,确保能重建完整时间线
|
"""
|
sessions_dir = get_sessions_path()
|
if not sessions_dir.exists():
|
return []
|
|
today = datetime.now()
|
today_files = []
|
|
# 扫描所有 .jsonl 相关文件(包括 .reset. 和 .deleted.)
|
for pattern in ["*.jsonl", "*.jsonl.reset.*", "*.jsonl.deleted.*"]:
|
for file in sessions_dir.glob(pattern):
|
try:
|
mtime = datetime.fromtimestamp(file.stat().st_mtime)
|
if mtime.date() == today.date():
|
today_files.append({
|
'path': file,
|
'mtime': mtime,
|
'name': file.name
|
})
|
except (OSError, ValueError):
|
continue
|
|
# 按修改时间排序
|
today_files.sort(key=lambda x: x['mtime'])
|
return today_files
|
|
|
def extract_user_content(text: str) -> Optional[str]:
|
"""
|
从消息文本中提取用户的实际内容
|
过滤掉系统提示、元数据等
|
"""
|
if not text or len(text) < 10:
|
return None
|
|
# 跳过纯系统提示消息
|
system_indicators = [
|
"OpenClaw runtime context",
|
"[Subagent Context]",
|
"You are running as a subagent",
|
"Results auto-announce",
|
"This context is runtime-generated",
|
"Keep internal details private",
|
"conversation info (untrusted)",
|
"feishu control message",
|
"feishu event type:",
|
]
|
|
lower_text = text.lower()
|
for indicator in system_indicators:
|
if indicator.lower() in lower_text[:200]:
|
return None
|
|
# 处理飞书消息格式 - 提取实际用户内容
|
# 格式:System: [时间] Feishu[main] DM from xxx: 实际内容
|
feishu_match = re.search(r'Feishu\[.*?\]\s+\w+\s+from\s+\w+:\s*(.+?)(?=\n\n|$)', text, re.DOTALL)
|
if feishu_match:
|
content = feishu_match.group(1).strip()
|
# 移除 JSON 元数据块
|
content = re.sub(r'```json\s*\{.*?\}\s*```', '', content, flags=re.DOTALL)
|
content = content.strip()
|
if len(content) > 10:
|
return content
|
return None
|
|
# 如果是普通用户消息(非系统消息),直接返回
|
if not text.startswith("System:") and not text.startswith("["):
|
return text.strip() if len(text) > 10 else None
|
|
return None
|
|
|
def parse_timestamp(ts: any) -> Optional[datetime]:
|
"""
|
解析各种格式的时间戳为 datetime 对象
|
支持 ISO 8601 字符串和毫秒级 Unix 时间戳
|
"""
|
if not ts:
|
return None
|
|
# 如果是数字(毫秒级 Unix 时间戳)
|
if isinstance(ts, (int, float)):
|
# 毫秒转秒
|
ts_sec = ts / 1000 if ts > 1e10 else ts
|
try:
|
return datetime.fromtimestamp(ts_sec)
|
except (ValueError, OSError):
|
return None
|
|
# 如果是字符串(ISO 8601 格式)
|
if isinstance(ts, str):
|
try:
|
# 处理带 Z 的 UTC 时间
|
ts = ts.replace('Z', '+00:00')
|
# Python 3.7+ 支持 fromisoformat
|
from datetime import timezone
|
dt = datetime.fromisoformat(ts)
|
# 转换为本地时间
|
if dt.tzinfo is not None:
|
dt = dt.replace(tzinfo=None)
|
return dt
|
except (ValueError, TypeError):
|
return None
|
|
return None
|
|
|
def extract_messages_from_session(file_info: Dict) -> List[Dict]:
|
"""
|
从 session 文件中提取所有真实用户消息
|
优化版:
|
1. 正确解析消息时间戳(而非使用文件修改时间)
|
2. 提取飞书消息中的真实发送时间
|
3. 改进内容去重和过滤
|
"""
|
messages = []
|
file_path = file_info['path']
|
session_name = file_info['name']
|
|
try:
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
lines = f.readlines()
|
|
for line in lines:
|
line = line.strip()
|
if not line:
|
continue
|
|
try:
|
record = json.loads(line)
|
|
# 只处理消息类型
|
if record.get("type") != "message":
|
continue
|
|
msg = record.get("message", {})
|
if not msg:
|
continue
|
|
# 只提取用户消息
|
if msg.get("role") != "user":
|
continue
|
|
content_list = msg.get("content", [])
|
if not content_list:
|
continue
|
|
# 提取文本内容
|
for item in content_list:
|
if isinstance(item, dict) and item.get("type") == "text":
|
text = item.get("text", "")
|
|
# 提取真实用户内容(过滤系统消息)
|
user_content = extract_user_content(text)
|
if not user_content:
|
break
|
|
# 解析时间戳 - 优先级:
|
# 1. record 级别的时间戳(ISO 8601)
|
# 2. message 内部的 timestamp(毫秒 Unix)
|
# 3. 从飞书消息文本中提取时间
|
# 4. 最后使用文件修改时间
|
|
msg_time = None
|
time_source = "unknown"
|
|
# 尝试从 record 获取时间戳
|
record_ts = record.get("timestamp")
|
if record_ts:
|
msg_time = parse_timestamp(record_ts)
|
if msg_time:
|
time_source = "record"
|
|
# 尝试从 message 内部获取时间戳(毫秒 Unix)
|
if not msg_time and "timestamp" in msg:
|
msg_time = parse_timestamp(msg.get("timestamp"))
|
if msg_time:
|
time_source = "message"
|
|
# 尝试从飞书消息文本中提取时间
|
if not msg_time:
|
feishu_time_match = re.search(r'\[(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', text)
|
if feishu_time_match:
|
try:
|
msg_time = datetime.strptime(feishu_time_match.group(1), "%Y-%m-%d %H:%M:%S")
|
time_source = "feishu_text"
|
except ValueError:
|
pass
|
|
# 最后使用文件修改时间
|
if not msg_time:
|
msg_time = file_info['mtime']
|
time_source = "file_mtime"
|
|
messages.append({
|
'timestamp': msg_time.isoformat() if msg_time else "",
|
'timestamp_dt': msg_time,
|
'content': user_content[:400], # 限制长度
|
'session': session_name,
|
'time_str': msg_time.strftime('%H:%M:%S') if msg_time else 'unknown',
|
'time_source': time_source,
|
'content_hash': hash(user_content[:100]) # 用于去重
|
})
|
break
|
|
except json.JSONDecodeError:
|
continue
|
|
except (IOError, OSError) as e:
|
print(f" 警告:无法读取文件 {file_info['name']}: {e}")
|
|
return messages
|
|
|
def deduplicate_messages(messages: List[Dict]) -> List[Dict]:
|
"""
|
对跨 session 的消息进行去重
|
基于内容哈希和时间窗口判断是否为重复消息
|
"""
|
if not messages:
|
return []
|
|
# 先按时间排序
|
messages.sort(key=lambda x: x.get('timestamp_dt') or datetime.min)
|
|
deduped = []
|
seen_hashes = {} # hash -> (timestamp, content_preview)
|
|
# 时间窗口:5分钟内相同内容的视为重复
|
time_window = timedelta(minutes=5)
|
|
for msg in messages:
|
content_hash = msg.get('content_hash')
|
msg_time = msg.get('timestamp_dt')
|
|
if content_hash is None:
|
deduped.append(msg)
|
continue
|
|
# 检查是否已有相似消息
|
is_duplicate = False
|
if content_hash in seen_hashes:
|
last_time, last_preview = seen_hashes[content_hash]
|
if msg_time and last_time:
|
if abs((msg_time - last_time).total_seconds()) < time_window.total_seconds():
|
is_duplicate = True
|
# 保留更详细的消息(更长的内容)
|
if len(msg.get('content', '')) > len(last_preview):
|
# 替换之前的消息
|
for i, existing in enumerate(deduped):
|
if existing.get('content_hash') == content_hash:
|
deduped[i] = msg
|
seen_hashes[content_hash] = (msg_time, msg.get('content', '')[:100])
|
break
|
|
if not is_duplicate:
|
deduped.append(msg)
|
if content_hash:
|
seen_hashes[content_hash] = (msg_time, msg.get('content', '')[:100])
|
|
return deduped
|
|
|
def aggregate_messages_across_sessions(session_files: List[Dict]) -> List[Dict]:
|
"""
|
跨 session 聚合所有消息,按时间排序
|
优化版:
|
1. 正确解析每条消息的真实时间戳
|
2. 跨 session 去重(处理 session 重置导致的重复消息)
|
3. 重建完整时间线
|
"""
|
all_messages = []
|
|
print(f"\n 正在处理 {len(session_files)} 个 session 文件...")
|
|
for file_info in session_files:
|
messages = extract_messages_from_session(file_info)
|
if messages:
|
all_messages.extend(messages)
|
# 显示时间源统计
|
time_sources = {}
|
for m in messages:
|
src = m.get('time_source', 'unknown')
|
time_sources[src] = time_sources.get(src, 0) + 1
|
print(f" 📄 {file_info['name'][:30]}...: {len(messages)} 条消息")
|
for src, count in time_sources.items():
|
print(f" └─ {src}: {count}")
|
|
if not all_messages:
|
return []
|
|
# 去重(处理 session 重置导致的重复)
|
print(f"\n 🔄 原始消息数: {len(all_messages)}")
|
all_messages = deduplicate_messages(all_messages)
|
print(f" ✅ 去重后消息数: {len(all_messages)}")
|
|
# 按时间戳排序,重建完整时间线
|
all_messages.sort(key=lambda x: x.get('timestamp_dt') or datetime.min)
|
|
return all_messages
|
|
|
def detect_important_events(messages: List[Dict]) -> List[Dict]:
|
"""
|
从聚合后的消息中检测重要事件
|
增强版:支持更多关键词和上下文分析
|
"""
|
# 扩展关键词列表
|
important_keywords = {
|
'配置变更': ['配置', 'config', 'setup', 'settings', '修改', '变更', '更新'],
|
'技能操作': ['技能', 'skill', '安装', '创建', '卸载', '删除', '移除', 'skill'],
|
'定时任务': ['定时', 'cron', '任务', 'schedule', 'job', '早报'],
|
'调试排错': ['调试', '测试', 'test', 'debug', '错误', 'error', '失败', 'fail', '问题'],
|
'决策讨论': ['决策', '决定', '方案', '选择', '最终', '结论', '分析'],
|
'搜索查询': ['搜索', '查找', 'query', 'find', 'check', '查询'],
|
'API集成': ['api', 'key', 'token', '集成', 'integration'],
|
'系统维护': ['重启', 'reset', 'restart', '维护', '清理', 'gateway'],
|
'代码提交': ['git', '提交', 'commit', 'push', 'pr'],
|
'文档记录': ['记录', '文档', 'journal', 'memory', '笔记'],
|
}
|
|
events = []
|
seen_contents = set() # 用于去重
|
|
for msg in messages:
|
content = msg['content']
|
content_hash = content[:100] # 用前100字符作为去重key
|
|
if content_hash in seen_contents:
|
continue
|
seen_contents.add(content_hash)
|
|
# 检查是否匹配任何关键词类别
|
for category, keywords in important_keywords.items():
|
for keyword in keywords:
|
if keyword.lower() in content.lower():
|
events.append({
|
'time': msg.get('session_time', 'unknown'),
|
'category': category,
|
'content': content[:200] + '...' if len(content) > 200 else content,
|
'session': msg.get('session', 'unknown')[:20]
|
})
|
break
|
else:
|
continue
|
break
|
|
return events
|
|
|
def generate_daily_summary(events: List[Dict]) -> str:
|
"""
|
生成每日活动摘要
|
"""
|
if not events:
|
return "今日暂无重要活动记录"
|
|
summary = f"\n📋 今日活动摘要(共 {len(events)} 个事件):\n"
|
summary += "=" * 60 + "\n"
|
|
# 按类别分组
|
by_category = defaultdict(list)
|
for event in events:
|
by_category[event['category']].append(event)
|
|
for category, cat_events in sorted(by_category.items()):
|
summary += f"\n【{category}】({len(cat_events)} 个)\n"
|
for i, event in enumerate(cat_events[:3], 1):
|
summary += f" {i}. [{event['time']}] {event['content']}\n"
|
if len(cat_events) > 3:
|
summary += f" ... 还有 {len(cat_events) - 3} 个相关事件\n"
|
|
return summary
|
|
|
def analyze_sessions_for_events() -> Tuple[bool, List[Dict], str]:
|
"""
|
分析今日所有 session,检查是否有重要事件需要记录
|
返回:(是否需要补充记录, 事件列表, 摘要文本)
|
"""
|
print("\n" + "=" * 60)
|
print("🔍 跨 Session 消息聚合分析 V2")
|
print("=" * 60)
|
|
session_files = get_today_session_files()
|
|
if not session_files:
|
print("\n⚠️ 未找到今日 session 文件")
|
return False, [], "未找到 session 文件"
|
|
print(f"\n📁 找到 {len(session_files)} 个 session 文件(含归档):")
|
current_count = sum(1 for f in session_files if '.reset.' not in f['name'] and '.deleted.' not in f['name'])
|
reset_count = sum(1 for f in session_files if '.reset.' in f['name'])
|
deleted_count = sum(1 for f in session_files if '.deleted.' in f['name'])
|
|
print(f" - 当前活跃: {current_count} 个")
|
print(f" - 重置归档: {reset_count} 个")
|
print(f" - 删除归档: {deleted_count} 个")
|
|
# 关键步骤:跨 session 聚合所有消息
|
print("\n🔄 正在聚合所有 session 的真实用户消息...")
|
all_messages = aggregate_messages_across_sessions(session_files)
|
|
if not all_messages:
|
print(" ⚠️ 未提取到真实用户消息(已过滤系统提示)")
|
return False, [], "未提取到用户消息"
|
|
print(f" ✅ 成功聚合 {len(all_messages)} 条用户消息(已过滤系统消息)")
|
|
# 显示活动时间线
|
print(f"\n⏱️ 活动时间跨度:")
|
first_time = all_messages[0].get('session_time', 'unknown')
|
last_time = all_messages[-1].get('session_time', 'unknown')
|
print(f" 开始:{first_time}")
|
print(f" 结束:{last_time}")
|
|
# 显示跨 session 统计
|
session_stats = defaultdict(int)
|
for msg in all_messages:
|
session_stats[msg.get('session', 'unknown')[:20]] += 1
|
|
print(f"\n📊 各 Session 消息分布:")
|
for session_name, count in sorted(session_stats.items(), key=lambda x: -x[1])[:5]:
|
print(f" - {session_name}: {count} 条")
|
|
# 检测重要事件
|
print("\n🎯 检测重要事件...")
|
events = detect_important_events(all_messages)
|
|
if events:
|
print(f" ✅ 识别到 {len(events)} 个重要事件(去重后)")
|
else:
|
print(" ℹ️ 未识别到重要事件")
|
|
# 生成摘要
|
summary = generate_daily_summary(events)
|
print(summary)
|
|
# 判断是否需要补充记录
|
has_today_journal = check_today_journal()
|
needs_update = len(events) >= 3 and not has_today_journal
|
|
if needs_update:
|
print(f"\n🚨 发现遗漏:今日有 {len(events)} 个重要事件但未写入 L2")
|
print(f" 建议:执行 '补充今日 L2 记录'")
|
elif has_today_journal:
|
print(f"\n✅ 已记录 L2,跨 session 聚合完成")
|
print(f" 共处理 {len(session_files)} 个 session,提取 {len(all_messages)} 条消息")
|
else:
|
print(f"\n⚠️ 今日无重要活动或已记录完毕")
|
|
return needs_update, events, summary
|
|
|
def main():
|
"""主函数。"""
|
today_str = datetime.now().strftime("%Y-%m-%d")
|
print(f"📅 日期检查: {today_str}")
|
|
# 关键步骤:跨 session 聚合分析
|
needs_update, events, summary = analyze_sessions_for_events()
|
|
# 检查 L0 大小
|
l0_size = get_l0_size()
|
print(f"\n📊 L0 (MEMORY.md) 大小检查:")
|
print(f" 当前: {format_size(l0_size)} / 4KB")
|
|
if l0_size > 4096:
|
print(" 🚨 警告:超过 4KB 红线!需要立即归档到 L1")
|
elif l0_size > 3500:
|
print(" ⚠️ 提醒:接近 4KB 限制,建议准备归档")
|
else:
|
print(" ✅ 大小正常")
|
|
# 维护清单
|
print("\n" + "=" * 60)
|
print("📋 每日维护清单:")
|
|
has_today_journal = check_today_journal()
|
if has_today_journal:
|
print(" [x] L2 记录已存在")
|
else:
|
print(" [ ] 如有重要事件,写入今日 L2")
|
|
if events:
|
print(f" [x] 已扫描并聚合 {len(events)} 个重要事件(跨 session)")
|
else:
|
print(" [-] 今日无重要活动")
|
|
print(" [ ] 检查 MEMORY.md 最近活动摘要")
|
if l0_size > 3500:
|
print(" [ ] L0 接近限制,考虑归档到 L1")
|
print(" [ ] 确认 L0 层引用链接有效")
|
|
print("\n💡 改进说明:")
|
print(" - 新增跨 session 消息聚合功能")
|
print(" - 智能过滤系统提示消息")
|
print(" - 自动识别 .reset. 和 .deleted. 归档文件")
|
print(" - 按时间线重建完整活动记录")
|
|
# 返回状态码
|
if needs_update:
|
return 2 # 需要补充记录
|
elif not has_today_journal:
|
return 1 # 无 L2 记录
|
else:
|
return 0 # 一切正常
|
|
|
if __name__ == "__main__":
|
sys.exit(main())
|