From 38938880eb8916d36079fd2405e7d336f8550517 Mon Sep 17 00:00:00 2001
From: TevinClaw <510129976@qq.com>
Date: Sun, 15 Mar 2026 23:23:58 +0800
Subject: [PATCH] 优化 memory-management 技能:增强跨 session 消息聚合能力
---
workspace/skills/memory-management/scripts/daily_check.py | 438 +++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 298 insertions(+), 140 deletions(-)
diff --git a/workspace/skills/memory-management/scripts/daily_check.py b/workspace/skills/memory-management/scripts/daily_check.py
index 07630b7..1e880cc 100755
--- a/workspace/skills/memory-management/scripts/daily_check.py
+++ b/workspace/skills/memory-management/scripts/daily_check.py
@@ -1,30 +1,36 @@
#!/usr/bin/env python3
"""
-每日记忆检查脚本
-在晚上10点后触发,检查今日是否已写入L2,并扫描session确保无遗漏
+每日记忆检查脚本 V2 - 增强跨 session 消息聚合能力
+优化点:
+1. 跨 session 消息聚合 - 合并所有 session 的消息按时间排序
+2. 增强事件检测 - 支持更多关键词和模式匹配
+3. 处理 session 重置 - 正确识别 .reset. 和 .deleted. 文件
+4. 完整时间线生成 - 按时间顺序展示今日所有活动
+5. 智能消息过滤 - 区分真实用户消息和系统提示
"""
import os
import sys
import json
import re
-from datetime import datetime
+from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional, Tuple
+from collections import defaultdict
def get_workspace_path() -> Path:
- """获取workspace路径。"""
+ """获取 workspace 路径。"""
return Path.home() / ".openclaw" / "workspace"
def get_sessions_path() -> Path:
- """获取sessions路径。"""
+ """获取 sessions 路径。"""
return Path.home() / ".openclaw" / "agents" / "main" / "sessions"
def check_today_journal() -> bool:
- """检查今日是否已有L2记录。"""
+ """检查今日是否已有 L2 记录。"""
workspace = get_workspace_path()
today = datetime.now().strftime("%Y-%m-%d")
journal_file = workspace / "memory" / "journal" / f"{today}.md"
@@ -32,7 +38,7 @@
def get_l0_size() -> int:
- """获取MEMORY.md文件大小(字节)。"""
+ """获取 MEMORY.md 文件大小(字节)。"""
workspace = get_workspace_path()
memory_file = workspace / "MEMORY.md"
if memory_file.exists():
@@ -46,10 +52,10 @@
return f"{kb:.1f}KB"
-def get_today_session_files() -> List[Path]:
+def get_today_session_files() -> List[Dict]:
"""
- 获取今日所有session文件(包括.reset.和.deleted.归档)
- 这是确保"没有遗漏"的关键步骤
+ 获取今日所有 session 文件(包括 .reset. 和 .deleted. 归档)
+ 按修改时间排序,确保能重建完整时间线
"""
sessions_dir = get_sessions_path()
if not sessions_dir.exists():
@@ -58,200 +64,352 @@
today = datetime.now()
today_files = []
- # 扫描所有.jsonl文件(包括.reset.和.deleted.)
- for file in sessions_dir.glob("*.jsonl*"):
- try:
- # 检查文件修改时间
- mtime = datetime.fromtimestamp(file.stat().st_mtime)
- if mtime.date() == today.date():
- today_files.append(file)
- except (OSError, ValueError):
- continue
+ # 扫描所有 .jsonl 相关文件(包括 .reset. 和 .deleted.)
+ for pattern in ["*.jsonl", "*.jsonl.reset.*", "*.jsonl.deleted.*"]:
+ for file in sessions_dir.glob(pattern):
+ try:
+ mtime = datetime.fromtimestamp(file.stat().st_mtime)
+ if mtime.date() == today.date():
+ today_files.append({
+ 'path': file,
+ 'mtime': mtime,
+ 'name': file.name
+ })
+ except (OSError, ValueError):
+ continue
# 按修改时间排序
- today_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
+ today_files.sort(key=lambda x: x['mtime'])
return today_files
-def extract_feishu_messages(file_path: Path, max_messages: int = 50) -> List[Dict]:
+def extract_user_content(text: str) -> Optional[str]:
"""
- 从session文件中提取飞书渠道的消息
- 返回用户发送的消息列表
+ 从消息文本中提取用户的实际内容
+ 过滤掉系统提示、元数据等
+ """
+ if not text or len(text) < 10:
+ return None
+
+ # 跳过纯系统提示消息
+ system_indicators = [
+ "OpenClaw runtime context",
+ "[Subagent Context]",
+ "You are running as a subagent",
+ "Results auto-announce",
+ "This context is runtime-generated",
+ "Keep internal details private",
+ "conversation info (untrusted)",
+ "feishu control message",
+ "feishu event type:",
+ ]
+
+ lower_text = text.lower()
+ for indicator in system_indicators:
+ if indicator.lower() in lower_text[:200]:
+ return None
+
+ # 处理飞书消息格式 - 提取实际用户内容
+ # 格式:System: [时间] Feishu[main] DM from xxx: 实际内容
+ feishu_match = re.search(r'Feishu\[.*?\]\s+\w+\s+from\s+\w+:\s*(.+?)(?=\n\n|$)', text, re.DOTALL)
+ if feishu_match:
+ content = feishu_match.group(1).strip()
+ # 移除 JSON 元数据块
+ content = re.sub(r'```json\s*\{.*?\}\s*```', '', content, flags=re.DOTALL)
+ content = content.strip()
+ if len(content) > 10:
+ return content
+ return None
+
+ # 如果是普通用户消息(非系统消息),直接返回
+ if not text.startswith("System:") and not text.startswith("["):
+ return text.strip() if len(text) > 10 else None
+
+ return None
+
+
+def extract_messages_from_session(file_info: Dict) -> List[Dict]:
+ """
+ 从 session 文件中提取所有真实用户消息
+ 增强版:过滤系统消息,提取实际用户内容
"""
messages = []
+ file_path = file_info['path']
+ session_name = file_info['name']
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
- for line_num, line in enumerate(f):
- if line_num >= max_messages * 3: # 限制读取行数
- break
+ lines = f.readlines()
+
+ for line in lines:
+ line = line.strip()
+ if not line:
+ continue
+
+ try:
+ record = json.loads(line)
- line = line.strip()
- if not line:
+ # 只处理消息类型
+ if record.get("type") != "message":
continue
- try:
- record = json.loads(line)
-
- # 只处理消息类型
- if record.get("type") != "message":
- continue
-
- msg = record.get("message", {})
- if not msg:
- continue
-
- # 检查是否是用户消息(role为user)
- if msg.get("role") != "user":
- continue
-
- # 提取内容
- content_list = msg.get("content", [])
- if not content_list:
- continue
-
- # 查找文本内容
- text_content = ""
- for item in content_list:
- if isinstance(item, dict) and item.get("type") == "text":
- text = item.get("text", "")
- # 过滤掉系统消息
- if text and not text.startswith("[") and len(text) > 10:
- text_content = text
- break
-
- if text_content:
- messages.append({
- "timestamp": record.get("timestamp", ""),
- "content": text_content[:200] # 限制长度
- })
+ msg = record.get("message", {})
+ if not msg:
+ continue
+
+ # 只提取用户消息
+ if msg.get("role") != "user":
+ continue
+
+ content_list = msg.get("content", [])
+ if not content_list:
+ continue
+
+ # 提取文本内容
+ for item in content_list:
+ if isinstance(item, dict) and item.get("type") == "text":
+ text = item.get("text", "")
- if len(messages) >= max_messages:
- break
-
- except json.JSONDecodeError:
- continue
-
+ # 提取真实用户内容(过滤系统消息)
+ user_content = extract_user_content(text)
+ if user_content:
+ messages.append({
+ 'timestamp': record.get("timestamp", ""),
+ 'content': user_content[:400], # 限制长度
+ 'session': session_name,
+ 'session_time': file_info['mtime'].strftime('%H:%M:%S')
+ })
+ break
+
+ except json.JSONDecodeError:
+ continue
+
except (IOError, OSError) as e:
- print(f" 警告:无法读取文件 {file_path.name}: {e}")
+ print(f" 警告:无法读取文件 {file_info['name']}: {e}")
return messages
-def analyze_sessions_for_events() -> Tuple[bool, List[str]]:
+def aggregate_messages_across_sessions(session_files: List[Dict]) -> List[Dict]:
"""
- 分析今日session,检查是否有重要事件需要记录
- 返回:(是否需要补充记录, 事件列表)
+ 跨 session 聚合所有消息,按时间排序
+ 这是解决 session 分割问题的关键函数
"""
- print("\n🔍 扫描今日session文件(检查是否遗漏):")
+ all_messages = []
+
+ for file_info in session_files:
+ messages = extract_messages_from_session(file_info)
+ all_messages.extend(messages)
+
+ # 按时间戳排序,重建完整时间线
+ all_messages.sort(key=lambda x: x.get('timestamp', ''))
+
+ return all_messages
+
+
+def detect_important_events(messages: List[Dict]) -> List[Dict]:
+ """
+ 从聚合后的消息中检测重要事件
+ 增强版:支持更多关键词和上下文分析
+ """
+ # 扩展关键词列表
+ important_keywords = {
+ '配置变更': ['配置', 'config', 'setup', 'settings', '修改', '变更', '更新'],
+ '技能操作': ['技能', 'skill', '安装', '创建', '卸载', '删除', '移除', 'skill'],
+ '定时任务': ['定时', 'cron', '任务', 'schedule', 'job', '早报'],
+ '调试排错': ['调试', '测试', 'test', 'debug', '错误', 'error', '失败', 'fail', '问题'],
+ '决策讨论': ['决策', '决定', '方案', '选择', '最终', '结论', '分析'],
+ '搜索查询': ['搜索', '查找', 'query', 'find', 'check', '查询'],
+ 'API集成': ['api', 'key', 'token', '集成', 'integration'],
+ '系统维护': ['重启', 'reset', 'restart', '维护', '清理', 'gateway'],
+ '代码提交': ['git', '提交', 'commit', 'push', 'pr'],
+ '文档记录': ['记录', '文档', 'journal', 'memory', '笔记'],
+ }
+
+ events = []
+ seen_contents = set() # 用于去重
+
+ for msg in messages:
+ content = msg['content']
+ content_hash = content[:100] # 用前100字符作为去重key
+
+ if content_hash in seen_contents:
+ continue
+ seen_contents.add(content_hash)
+
+ # 检查是否匹配任何关键词类别
+ for category, keywords in important_keywords.items():
+ for keyword in keywords:
+ if keyword.lower() in content.lower():
+ events.append({
+ 'time': msg.get('session_time', 'unknown'),
+ 'category': category,
+ 'content': content[:200] + '...' if len(content) > 200 else content,
+ 'session': msg.get('session', 'unknown')[:20]
+ })
+ break
+ else:
+ continue
+ break
+
+ return events
+
+
+def generate_daily_summary(events: List[Dict]) -> str:
+ """
+ 生成每日活动摘要
+ """
+ if not events:
+ return "今日暂无重要活动记录"
+
+ summary = f"\n📋 今日活动摘要(共 {len(events)} 个事件):\n"
+ summary += "=" * 60 + "\n"
+
+ # 按类别分组
+ by_category = defaultdict(list)
+ for event in events:
+ by_category[event['category']].append(event)
+
+ for category, cat_events in sorted(by_category.items()):
+ summary += f"\n【{category}】({len(cat_events)} 个)\n"
+ for i, event in enumerate(cat_events[:3], 1):
+ summary += f" {i}. [{event['time']}] {event['content']}\n"
+ if len(cat_events) > 3:
+ summary += f" ... 还有 {len(cat_events) - 3} 个相关事件\n"
+
+ return summary
+
+
+def analyze_sessions_for_events() -> Tuple[bool, List[Dict], str]:
+ """
+ 分析今日所有 session,检查是否有重要事件需要记录
+ 返回:(是否需要补充记录, 事件列表, 摘要文本)
+ """
+ print("\n" + "=" * 60)
+ print("🔍 跨 Session 消息聚合分析 V2")
+ print("=" * 60)
session_files = get_today_session_files()
if not session_files:
- print(" ⚠️ 未找到今日session文件")
- return False, []
+ print("\n⚠️ 未找到今日 session 文件")
+ return False, [], "未找到 session 文件"
- print(f" 找到 {len(session_files)} 个session文件:")
- for f in session_files:
- mtime = datetime.fromtimestamp(f.stat().st_mtime)
- print(f" - {f.name} ({mtime.strftime('%H:%M')})")
+ print(f"\n📁 找到 {len(session_files)} 个 session 文件(含归档):")
+ current_count = sum(1 for f in session_files if '.reset.' not in f['name'] and '.deleted.' not in f['name'])
+ reset_count = sum(1 for f in session_files if '.reset.' in f['name'])
+ deleted_count = sum(1 for f in session_files if '.deleted.' in f['name'])
- # 关键词列表,用于识别重要事件
- important_keywords = [
- "安装", "创建", "配置", "定时任务", "cron", "技能", "skill",
- "早报", "更新", "修改", "决策", "设定"
- ]
+ print(f" - 当前活跃: {current_count} 个")
+ print(f" - 重置归档: {reset_count} 个")
+ print(f" - 删除归档: {deleted_count} 个")
- found_events = []
- total_user_messages = 0
+ # 关键步骤:跨 session 聚合所有消息
+ print("\n🔄 正在聚合所有 session 的真实用户消息...")
+ all_messages = aggregate_messages_across_sessions(session_files)
- for file_path in session_files:
- messages = extract_feishu_messages(file_path, max_messages=20)
- total_user_messages += len(messages)
-
- for msg in messages:
- content = msg["content"]
- # 检查是否包含重要事件关键词
- for keyword in important_keywords:
- if keyword in content and len(content) > 20:
- event_summary = content[:100] + "..." if len(content) > 100 else content
- if event_summary not in found_events:
- found_events.append(event_summary)
- break
+ if not all_messages:
+ print(" ⚠️ 未提取到真实用户消息(已过滤系统提示)")
+ return False, [], "未提取到用户消息"
- print(f"\n 提取到 {total_user_messages} 条用户消息")
+ print(f" ✅ 成功聚合 {len(all_messages)} 条用户消息(已过滤系统消息)")
- if found_events:
- print(f" 识别到 {len(found_events)} 个可能的重要事件:")
- for i, event in enumerate(found_events[:5], 1): # 只显示前5个
- print(f" {i}. {event}")
+ # 显示活动时间线
+ print(f"\n⏱️ 活动时间跨度:")
+ first_time = all_messages[0].get('session_time', 'unknown')
+ last_time = all_messages[-1].get('session_time', 'unknown')
+ print(f" 开始:{first_time}")
+ print(f" 结束:{last_time}")
+
+ # 显示跨 session 统计
+ session_stats = defaultdict(int)
+ for msg in all_messages:
+ session_stats[msg.get('session', 'unknown')[:20]] += 1
+
+ print(f"\n📊 各 Session 消息分布:")
+ for session_name, count in sorted(session_stats.items(), key=lambda x: -x[1])[:5]:
+ print(f" - {session_name}: {count} 条")
+
+ # 检测重要事件
+ print("\n🎯 检测重要事件...")
+ events = detect_important_events(all_messages)
+
+ if events:
+ print(f" ✅ 识别到 {len(events)} 个重要事件(去重后)")
+ else:
+ print(" ℹ️ 未识别到重要事件")
+
+ # 生成摘要
+ summary = generate_daily_summary(events)
+ print(summary)
# 判断是否需要补充记录
- needs_update = len(found_events) >= 2 and not check_today_journal()
+ has_today_journal = check_today_journal()
+ needs_update = len(events) >= 3 and not has_today_journal
- return needs_update, found_events
+ if needs_update:
+ print(f"\n🚨 发现遗漏:今日有 {len(events)} 个重要事件但未写入 L2")
+ print(f" 建议:执行 '补充今日 L2 记录'")
+ elif has_today_journal:
+ print(f"\n✅ 已记录 L2,跨 session 聚合完成")
+ print(f" 共处理 {len(session_files)} 个 session,提取 {len(all_messages)} 条消息")
+ else:
+ print(f"\n⚠️ 今日无重要活动或已记录完毕")
+
+ return needs_update, events, summary
def main():
"""主函数。"""
today_str = datetime.now().strftime("%Y-%m-%d")
print(f"📅 日期检查: {today_str}")
- print("=" * 50)
- # 检查今日L2
- has_today_journal = check_today_journal()
- print(f"\n📝 L2记录检查:")
- if has_today_journal:
- print(" ✅ 今日已有journal记录")
- else:
- print(" ⚠️ 今日尚未创建journal记录")
+ # 关键步骤:跨 session 聚合分析
+ needs_update, events, summary = analyze_sessions_for_events()
- # 关键步骤:扫描session文件确保无遗漏
- needs_update, events = analyze_sessions_for_events()
-
- if needs_update:
- print(f"\n🚨 发现遗漏:今日有session活动但未写入L2")
- print(f" 识别到 {len(events)} 个事件需要记录")
- print(" 建议:执行 '检查今天的session并生成总结'")
- elif has_today_journal:
- print("\n ✅ 已记录L2,session扫描完成")
- else:
- print("\n ⚠️ 今日无重要活动或已记录完毕")
-
- # 检查L0大小
+ # 检查 L0 大小
l0_size = get_l0_size()
print(f"\n📊 L0 (MEMORY.md) 大小检查:")
print(f" 当前: {format_size(l0_size)} / 4KB")
if l0_size > 4096:
- print(" 🚨 警告:超过4KB红线!需要立即归档到L1")
+ print(" 🚨 警告:超过 4KB 红线!需要立即归档到 L1")
elif l0_size > 3500:
- print(" ⚠️ 提醒:接近4KB限制,建议准备归档")
+ print(" ⚠️ 提醒:接近 4KB 限制,建议准备归档")
else:
print(" ✅ 大小正常")
- print("\n" + "=" * 50)
+ # 维护清单
+ print("\n" + "=" * 60)
print("📋 每日维护清单:")
+
+ has_today_journal = check_today_journal()
if has_today_journal:
- print(" [x] L2记录已存在")
+ print(" [x] L2 记录已存在")
else:
- print(" [ ] 如有重要事件,写入今日L2")
+ print(" [ ] 如有重要事件,写入今日 L2")
- if session_files := get_today_session_files():
- print(f" [x] 已扫描 {len(session_files)} 个session文件")
+ if events:
+ print(f" [x] 已扫描并聚合 {len(events)} 个重要事件(跨 session)")
else:
- print(" [-] 今日无session活动")
+ print(" [-] 今日无重要活动")
- print(" [ ] 检查MEMORY.md最近活动摘要")
+ print(" [ ] 检查 MEMORY.md 最近活动摘要")
if l0_size > 3500:
- print(" [ ] L0接近限制,考虑归档到L1")
- print(" [ ] 确认L0层引用链接有效")
+ print(" [ ] L0 接近限制,考虑归档到 L1")
+ print(" [ ] 确认 L0 层引用链接有效")
+
+ print("\n💡 改进说明:")
+ print(" - 新增跨 session 消息聚合功能")
+ print(" - 智能过滤系统提示消息")
+ print(" - 自动识别 .reset. 和 .deleted. 归档文件")
+ print(" - 按时间线重建完整活动记录")
# 返回状态码
if needs_update:
return 2 # 需要补充记录
elif not has_today_journal:
- return 1 # 无L2记录
+ return 1 # 无 L2 记录
else:
return 0 # 一切正常
--
Gitblit v1.9.1