#!/usr/bin/env python3 """ 每日记忆检查脚本 V2 - 增强跨 session 消息聚合能力 优化点: 1. 跨 session 消息聚合 - 合并所有 session 的消息按时间排序 2. 增强事件检测 - 支持更多关键词和模式匹配 3. 处理 session 重置 - 正确识别 .reset. 和 .deleted. 文件 4. 完整时间线生成 - 按时间顺序展示今日所有活动 5. 智能消息过滤 - 区分真实用户消息和系统提示 """ import os import sys import json import re from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Optional, Tuple from collections import defaultdict def get_workspace_path() -> Path: """获取 workspace 路径。""" return Path.home() / ".openclaw" / "workspace" def get_sessions_path() -> Path: """获取 sessions 路径。""" return Path.home() / ".openclaw" / "agents" / "main" / "sessions" def check_today_journal() -> bool: """检查今日是否已有 L2 记录。""" workspace = get_workspace_path() today = datetime.now().strftime("%Y-%m-%d") journal_file = workspace / "memory" / "journal" / f"{today}.md" return journal_file.exists() def get_l0_size() -> int: """获取 MEMORY.md 文件大小(字节)。""" workspace = get_workspace_path() memory_file = workspace / "MEMORY.md" if memory_file.exists(): return memory_file.stat().st_size return 0 def format_size(size_bytes: int) -> str: """格式化文件大小显示。""" kb = size_bytes / 1024 return f"{kb:.1f}KB" def get_today_session_files() -> List[Dict]: """ 获取今日所有 session 文件(包括 .reset. 和 .deleted. 归档) 按修改时间排序,确保能重建完整时间线 """ sessions_dir = get_sessions_path() if not sessions_dir.exists(): return [] today = datetime.now() today_files = [] # 扫描所有 .jsonl 相关文件(包括 .reset. 和 .deleted.) for pattern in ["*.jsonl", "*.jsonl.reset.*", "*.jsonl.deleted.*"]: for file in sessions_dir.glob(pattern): try: mtime = datetime.fromtimestamp(file.stat().st_mtime) if mtime.date() == today.date(): today_files.append({ 'path': file, 'mtime': mtime, 'name': file.name }) except (OSError, ValueError): continue # 按修改时间排序 today_files.sort(key=lambda x: x['mtime']) return today_files def extract_user_content(text: str) -> Optional[str]: """ 从消息文本中提取用户的实际内容 过滤掉系统提示、元数据等 """ if not text or len(text) < 10: return None # 跳过纯系统提示消息 system_indicators = [ "OpenClaw runtime context", "[Subagent Context]", "You are running as a subagent", "Results auto-announce", "This context is runtime-generated", "Keep internal details private", "conversation info (untrusted)", "feishu control message", "feishu event type:", ] lower_text = text.lower() for indicator in system_indicators: if indicator.lower() in lower_text[:200]: return None # 处理飞书消息格式 - 提取实际用户内容 # 格式:System: [时间] Feishu[main] DM from xxx: 实际内容 feishu_match = re.search(r'Feishu\[.*?\]\s+\w+\s+from\s+\w+:\s*(.+?)(?=\n\n|$)', text, re.DOTALL) if feishu_match: content = feishu_match.group(1).strip() # 移除 JSON 元数据块 content = re.sub(r'```json\s*\{.*?\}\s*```', '', content, flags=re.DOTALL) content = content.strip() if len(content) > 10: return content return None # 如果是普通用户消息(非系统消息),直接返回 if not text.startswith("System:") and not text.startswith("["): return text.strip() if len(text) > 10 else None return None def parse_timestamp(ts: any) -> Optional[datetime]: """ 解析各种格式的时间戳为 datetime 对象 支持 ISO 8601 字符串和毫秒级 Unix 时间戳 """ if not ts: return None # 如果是数字(毫秒级 Unix 时间戳) if isinstance(ts, (int, float)): # 毫秒转秒 ts_sec = ts / 1000 if ts > 1e10 else ts try: return datetime.fromtimestamp(ts_sec) except (ValueError, OSError): return None # 如果是字符串(ISO 8601 格式) if isinstance(ts, str): try: # 处理带 Z 的 UTC 时间 ts = ts.replace('Z', '+00:00') # Python 3.7+ 支持 fromisoformat from datetime import timezone dt = datetime.fromisoformat(ts) # 转换为本地时间 if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) return dt except (ValueError, TypeError): return None return None def extract_messages_from_session(file_info: Dict) -> List[Dict]: """ 从 session 文件中提取所有真实用户消息 优化版: 1. 正确解析消息时间戳(而非使用文件修改时间) 2. 提取飞书消息中的真实发送时间 3. 改进内容去重和过滤 """ messages = [] file_path = file_info['path'] session_name = file_info['name'] try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue try: record = json.loads(line) # 只处理消息类型 if record.get("type") != "message": continue msg = record.get("message", {}) if not msg: continue # 只提取用户消息 if msg.get("role") != "user": continue content_list = msg.get("content", []) if not content_list: continue # 提取文本内容 for item in content_list: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text", "") # 提取真实用户内容(过滤系统消息) user_content = extract_user_content(text) if not user_content: break # 解析时间戳 - 优先级: # 1. record 级别的时间戳(ISO 8601) # 2. message 内部的 timestamp(毫秒 Unix) # 3. 从飞书消息文本中提取时间 # 4. 最后使用文件修改时间 msg_time = None time_source = "unknown" # 尝试从 record 获取时间戳 record_ts = record.get("timestamp") if record_ts: msg_time = parse_timestamp(record_ts) if msg_time: time_source = "record" # 尝试从 message 内部获取时间戳(毫秒 Unix) if not msg_time and "timestamp" in msg: msg_time = parse_timestamp(msg.get("timestamp")) if msg_time: time_source = "message" # 尝试从飞书消息文本中提取时间 if not msg_time: feishu_time_match = re.search(r'\[(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', text) if feishu_time_match: try: msg_time = datetime.strptime(feishu_time_match.group(1), "%Y-%m-%d %H:%M:%S") time_source = "feishu_text" except ValueError: pass # 最后使用文件修改时间 if not msg_time: msg_time = file_info['mtime'] time_source = "file_mtime" messages.append({ 'timestamp': msg_time.isoformat() if msg_time else "", 'timestamp_dt': msg_time, 'content': user_content[:400], # 限制长度 'session': session_name, 'time_str': msg_time.strftime('%H:%M:%S') if msg_time else 'unknown', 'time_source': time_source, 'content_hash': hash(user_content[:100]) # 用于去重 }) break except json.JSONDecodeError: continue except (IOError, OSError) as e: print(f" 警告:无法读取文件 {file_info['name']}: {e}") return messages def deduplicate_messages(messages: List[Dict]) -> List[Dict]: """ 对跨 session 的消息进行去重 基于内容哈希和时间窗口判断是否为重复消息 """ if not messages: return [] # 先按时间排序 messages.sort(key=lambda x: x.get('timestamp_dt') or datetime.min) deduped = [] seen_hashes = {} # hash -> (timestamp, content_preview) # 时间窗口:5分钟内相同内容的视为重复 time_window = timedelta(minutes=5) for msg in messages: content_hash = msg.get('content_hash') msg_time = msg.get('timestamp_dt') if content_hash is None: deduped.append(msg) continue # 检查是否已有相似消息 is_duplicate = False if content_hash in seen_hashes: last_time, last_preview = seen_hashes[content_hash] if msg_time and last_time: if abs((msg_time - last_time).total_seconds()) < time_window.total_seconds(): is_duplicate = True # 保留更详细的消息(更长的内容) if len(msg.get('content', '')) > len(last_preview): # 替换之前的消息 for i, existing in enumerate(deduped): if existing.get('content_hash') == content_hash: deduped[i] = msg seen_hashes[content_hash] = (msg_time, msg.get('content', '')[:100]) break if not is_duplicate: deduped.append(msg) if content_hash: seen_hashes[content_hash] = (msg_time, msg.get('content', '')[:100]) return deduped def aggregate_messages_across_sessions(session_files: List[Dict]) -> List[Dict]: """ 跨 session 聚合所有消息,按时间排序 优化版: 1. 正确解析每条消息的真实时间戳 2. 跨 session 去重(处理 session 重置导致的重复消息) 3. 重建完整时间线 """ all_messages = [] print(f"\n 正在处理 {len(session_files)} 个 session 文件...") for file_info in session_files: messages = extract_messages_from_session(file_info) if messages: all_messages.extend(messages) # 显示时间源统计 time_sources = {} for m in messages: src = m.get('time_source', 'unknown') time_sources[src] = time_sources.get(src, 0) + 1 print(f" 📄 {file_info['name'][:30]}...: {len(messages)} 条消息") for src, count in time_sources.items(): print(f" └─ {src}: {count}") if not all_messages: return [] # 去重(处理 session 重置导致的重复) print(f"\n 🔄 原始消息数: {len(all_messages)}") all_messages = deduplicate_messages(all_messages) print(f" ✅ 去重后消息数: {len(all_messages)}") # 按时间戳排序,重建完整时间线 all_messages.sort(key=lambda x: x.get('timestamp_dt') or datetime.min) return all_messages def detect_important_events(messages: List[Dict]) -> List[Dict]: """ 从聚合后的消息中检测重要事件 增强版:支持更多关键词和上下文分析 """ # 扩展关键词列表 important_keywords = { '配置变更': ['配置', 'config', 'setup', 'settings', '修改', '变更', '更新'], '技能操作': ['技能', 'skill', '安装', '创建', '卸载', '删除', '移除', 'skill'], '定时任务': ['定时', 'cron', '任务', 'schedule', 'job', '早报'], '调试排错': ['调试', '测试', 'test', 'debug', '错误', 'error', '失败', 'fail', '问题'], '决策讨论': ['决策', '决定', '方案', '选择', '最终', '结论', '分析'], '搜索查询': ['搜索', '查找', 'query', 'find', 'check', '查询'], 'API集成': ['api', 'key', 'token', '集成', 'integration'], '系统维护': ['重启', 'reset', 'restart', '维护', '清理', 'gateway'], '代码提交': ['git', '提交', 'commit', 'push', 'pr'], '文档记录': ['记录', '文档', 'journal', 'memory', '笔记'], } events = [] seen_contents = set() # 用于去重 for msg in messages: content = msg['content'] content_hash = content[:100] # 用前100字符作为去重key if content_hash in seen_contents: continue seen_contents.add(content_hash) # 检查是否匹配任何关键词类别 for category, keywords in important_keywords.items(): for keyword in keywords: if keyword.lower() in content.lower(): events.append({ 'time': msg.get('session_time', 'unknown'), 'category': category, 'content': content[:200] + '...' if len(content) > 200 else content, 'session': msg.get('session', 'unknown')[:20] }) break else: continue break return events def generate_daily_summary(events: List[Dict]) -> str: """ 生成每日活动摘要 """ if not events: return "今日暂无重要活动记录" summary = f"\n📋 今日活动摘要(共 {len(events)} 个事件):\n" summary += "=" * 60 + "\n" # 按类别分组 by_category = defaultdict(list) for event in events: by_category[event['category']].append(event) for category, cat_events in sorted(by_category.items()): summary += f"\n【{category}】({len(cat_events)} 个)\n" for i, event in enumerate(cat_events[:3], 1): summary += f" {i}. [{event['time']}] {event['content']}\n" if len(cat_events) > 3: summary += f" ... 还有 {len(cat_events) - 3} 个相关事件\n" return summary def analyze_sessions_for_events() -> Tuple[bool, List[Dict], str]: """ 分析今日所有 session,检查是否有重要事件需要记录 返回:(是否需要补充记录, 事件列表, 摘要文本) """ print("\n" + "=" * 60) print("🔍 跨 Session 消息聚合分析 V2") print("=" * 60) session_files = get_today_session_files() if not session_files: print("\n⚠️ 未找到今日 session 文件") return False, [], "未找到 session 文件" print(f"\n📁 找到 {len(session_files)} 个 session 文件(含归档):") current_count = sum(1 for f in session_files if '.reset.' not in f['name'] and '.deleted.' not in f['name']) reset_count = sum(1 for f in session_files if '.reset.' in f['name']) deleted_count = sum(1 for f in session_files if '.deleted.' in f['name']) print(f" - 当前活跃: {current_count} 个") print(f" - 重置归档: {reset_count} 个") print(f" - 删除归档: {deleted_count} 个") # 关键步骤:跨 session 聚合所有消息 print("\n🔄 正在聚合所有 session 的真实用户消息...") all_messages = aggregate_messages_across_sessions(session_files) if not all_messages: print(" ⚠️ 未提取到真实用户消息(已过滤系统提示)") return False, [], "未提取到用户消息" print(f" ✅ 成功聚合 {len(all_messages)} 条用户消息(已过滤系统消息)") # 显示活动时间线 print(f"\n⏱️ 活动时间跨度:") first_time = all_messages[0].get('session_time', 'unknown') last_time = all_messages[-1].get('session_time', 'unknown') print(f" 开始:{first_time}") print(f" 结束:{last_time}") # 显示跨 session 统计 session_stats = defaultdict(int) for msg in all_messages: session_stats[msg.get('session', 'unknown')[:20]] += 1 print(f"\n📊 各 Session 消息分布:") for session_name, count in sorted(session_stats.items(), key=lambda x: -x[1])[:5]: print(f" - {session_name}: {count} 条") # 检测重要事件 print("\n🎯 检测重要事件...") events = detect_important_events(all_messages) if events: print(f" ✅ 识别到 {len(events)} 个重要事件(去重后)") else: print(" ℹ️ 未识别到重要事件") # 生成摘要 summary = generate_daily_summary(events) print(summary) # 判断是否需要补充记录 has_today_journal = check_today_journal() needs_update = len(events) >= 3 and not has_today_journal if needs_update: print(f"\n🚨 发现遗漏:今日有 {len(events)} 个重要事件但未写入 L2") print(f" 建议:执行 '补充今日 L2 记录'") elif has_today_journal: print(f"\n✅ 已记录 L2,跨 session 聚合完成") print(f" 共处理 {len(session_files)} 个 session,提取 {len(all_messages)} 条消息") else: print(f"\n⚠️ 今日无重要活动或已记录完毕") return needs_update, events, summary def main(): """主函数。""" today_str = datetime.now().strftime("%Y-%m-%d") print(f"📅 日期检查: {today_str}") # 关键步骤:跨 session 聚合分析 needs_update, events, summary = analyze_sessions_for_events() # 检查 L0 大小 l0_size = get_l0_size() print(f"\n📊 L0 (MEMORY.md) 大小检查:") print(f" 当前: {format_size(l0_size)} / 4KB") if l0_size > 4096: print(" 🚨 警告:超过 4KB 红线!需要立即归档到 L1") elif l0_size > 3500: print(" ⚠️ 提醒:接近 4KB 限制,建议准备归档") else: print(" ✅ 大小正常") # 维护清单 print("\n" + "=" * 60) print("📋 每日维护清单:") has_today_journal = check_today_journal() if has_today_journal: print(" [x] L2 记录已存在") else: print(" [ ] 如有重要事件,写入今日 L2") if events: print(f" [x] 已扫描并聚合 {len(events)} 个重要事件(跨 session)") else: print(" [-] 今日无重要活动") print(" [ ] 检查 MEMORY.md 最近活动摘要") if l0_size > 3500: print(" [ ] L0 接近限制,考虑归档到 L1") print(" [ ] 确认 L0 层引用链接有效") print("\n💡 改进说明:") print(" - 新增跨 session 消息聚合功能") print(" - 智能过滤系统提示消息") print(" - 自动识别 .reset. 和 .deleted. 归档文件") print(" - 按时间线重建完整活动记录") # 返回状态码 if needs_update: return 2 # 需要补充记录 elif not has_today_journal: return 1 # 无 L2 记录 else: return 0 # 一切正常 if __name__ == "__main__": sys.exit(main())