| | |
| | | #!/usr/bin/env python3 |
| | | """ |
| | | 每日记忆检查脚本 |
| | | 在晚上10点后触发,检查今日是否已写入L2 |
| | | 在晚上10点后触发,检查今日是否已写入L2,并扫描session确保无遗漏 |
| | | """ |
| | | |
| | | import os |
| | | import sys |
| | | import json |
| | | import re |
| | | from datetime import datetime |
| | | from pathlib import Path |
| | | from typing import List, Dict, Optional, Tuple |
| | | |
| | | |
| | | def get_workspace_path() -> Path: |
| | | """获取workspace路径。""" |
| | | return Path.home() / ".openclaw" / "workspace" |
| | | |
| | | |
| | | def get_sessions_path() -> Path: |
| | | """获取sessions路径。""" |
| | | return Path.home() / ".openclaw" / "agents" / "main" / "sessions" |
| | | |
| | | |
| | | def check_today_journal() -> bool: |
| | |
| | | return f"{kb:.1f}KB" |
| | | |
| | | |
| | | def get_today_session_files() -> List[Path]: |
| | | """ |
| | | 获取今日所有session文件(包括.reset.和.deleted.归档) |
| | | 这是确保"没有遗漏"的关键步骤 |
| | | """ |
| | | sessions_dir = get_sessions_path() |
| | | if not sessions_dir.exists(): |
| | | return [] |
| | | |
| | | today = datetime.now() |
| | | today_files = [] |
| | | |
| | | # 扫描所有.jsonl文件(包括.reset.和.deleted.) |
| | | for file in sessions_dir.glob("*.jsonl*"): |
| | | try: |
| | | # 检查文件修改时间 |
| | | mtime = datetime.fromtimestamp(file.stat().st_mtime) |
| | | if mtime.date() == today.date(): |
| | | today_files.append(file) |
| | | except (OSError, ValueError): |
| | | continue |
| | | |
| | | # 按修改时间排序 |
| | | today_files.sort(key=lambda f: f.stat().st_mtime, reverse=True) |
| | | return today_files |
| | | |
| | | |
| | | def extract_feishu_messages(file_path: Path, max_messages: int = 50) -> List[Dict]: |
| | | """ |
| | | 从session文件中提取飞书渠道的消息 |
| | | 返回用户发送的消息列表 |
| | | """ |
| | | messages = [] |
| | | |
| | | try: |
| | | with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
| | | for line_num, line in enumerate(f): |
| | | if line_num >= max_messages * 3: # 限制读取行数 |
| | | break |
| | | |
| | | line = line.strip() |
| | | if not line: |
| | | continue |
| | | |
| | | try: |
| | | record = json.loads(line) |
| | | |
| | | # 只处理消息类型 |
| | | if record.get("type") != "message": |
| | | continue |
| | | |
| | | msg = record.get("message", {}) |
| | | if not msg: |
| | | continue |
| | | |
| | | # 检查是否是用户消息(role为user) |
| | | if msg.get("role") != "user": |
| | | continue |
| | | |
| | | # 提取内容 |
| | | content_list = msg.get("content", []) |
| | | if not content_list: |
| | | continue |
| | | |
| | | # 查找文本内容 |
| | | text_content = "" |
| | | for item in content_list: |
| | | if isinstance(item, dict) and item.get("type") == "text": |
| | | text = item.get("text", "") |
| | | # 过滤掉系统消息 |
| | | if text and not text.startswith("[") and len(text) > 10: |
| | | text_content = text |
| | | break |
| | | |
| | | if text_content: |
| | | messages.append({ |
| | | "timestamp": record.get("timestamp", ""), |
| | | "content": text_content[:200] # 限制长度 |
| | | }) |
| | | |
| | | if len(messages) >= max_messages: |
| | | break |
| | | |
| | | except json.JSONDecodeError: |
| | | continue |
| | | |
| | | except (IOError, OSError) as e: |
| | | print(f" 警告:无法读取文件 {file_path.name}: {e}") |
| | | |
| | | return messages |
| | | |
| | | |
| | | def analyze_sessions_for_events() -> Tuple[bool, List[str]]: |
| | | """ |
| | | 分析今日session,检查是否有重要事件需要记录 |
| | | 返回:(是否需要补充记录, 事件列表) |
| | | """ |
| | | print("\n🔍 扫描今日session文件(检查是否遗漏):") |
| | | |
| | | session_files = get_today_session_files() |
| | | |
| | | if not session_files: |
| | | print(" ⚠️ 未找到今日session文件") |
| | | return False, [] |
| | | |
| | | print(f" 找到 {len(session_files)} 个session文件:") |
| | | for f in session_files: |
| | | mtime = datetime.fromtimestamp(f.stat().st_mtime) |
| | | print(f" - {f.name} ({mtime.strftime('%H:%M')})") |
| | | |
| | | # 关键词列表,用于识别重要事件 |
| | | important_keywords = [ |
| | | "安装", "创建", "配置", "定时任务", "cron", "技能", "skill", |
| | | "早报", "更新", "修改", "决策", "设定" |
| | | ] |
| | | |
| | | found_events = [] |
| | | total_user_messages = 0 |
| | | |
| | | for file_path in session_files: |
| | | messages = extract_feishu_messages(file_path, max_messages=20) |
| | | total_user_messages += len(messages) |
| | | |
| | | for msg in messages: |
| | | content = msg["content"] |
| | | # 检查是否包含重要事件关键词 |
| | | for keyword in important_keywords: |
| | | if keyword in content and len(content) > 20: |
| | | event_summary = content[:100] + "..." if len(content) > 100 else content |
| | | if event_summary not in found_events: |
| | | found_events.append(event_summary) |
| | | break |
| | | |
| | | print(f"\n 提取到 {total_user_messages} 条用户消息") |
| | | |
| | | if found_events: |
| | | print(f" 识别到 {len(found_events)} 个可能的重要事件:") |
| | | for i, event in enumerate(found_events[:5], 1): # 只显示前5个 |
| | | print(f" {i}. {event}") |
| | | |
| | | # 判断是否需要补充记录 |
| | | needs_update = len(found_events) >= 2 and not check_today_journal() |
| | | |
| | | return needs_update, found_events |
| | | |
| | | |
| | | def main(): |
| | | """主函数。""" |
| | | today_str = datetime.now().strftime("%Y-%m-%d") |
| | |
| | | print(" ✅ 今日已有journal记录") |
| | | else: |
| | | print(" ⚠️ 今日尚未创建journal记录") |
| | | print(" 💡 建议:如有重要决策或事件,写入L2详情层") |
| | | |
| | | # 关键步骤:扫描session文件确保无遗漏 |
| | | needs_update, events = analyze_sessions_for_events() |
| | | |
| | | if needs_update: |
| | | print(f"\n🚨 发现遗漏:今日有session活动但未写入L2") |
| | | print(f" 识别到 {len(events)} 个事件需要记录") |
| | | print(" 建议:执行 '检查今天的session并生成总结'") |
| | | elif has_today_journal: |
| | | print("\n ✅ 已记录L2,session扫描完成") |
| | | else: |
| | | print("\n ⚠️ 今日无重要活动或已记录完毕") |
| | | |
| | | # 检查L0大小 |
| | | l0_size = get_l0_size() |
| | |
| | | |
| | | print("\n" + "=" * 50) |
| | | print("📋 每日维护清单:") |
| | | if not has_today_journal: |
| | | print(" [ ] 如有重要事件,写入今日L2") |
| | | else: |
| | | if has_today_journal: |
| | | print(" [x] L2记录已存在") |
| | | else: |
| | | print(" [ ] 如有重要事件,写入今日L2") |
| | | |
| | | if session_files := get_today_session_files(): |
| | | print(f" [x] 已扫描 {len(session_files)} 个session文件") |
| | | else: |
| | | print(" [-] 今日无session活动") |
| | | |
| | | print(" [ ] 检查MEMORY.md最近活动摘要") |
| | | if l0_size > 3500: |
| | | print(" [ ] L0接近限制,考虑归档到L1") |
| | | print(" [ ] 确认L0层引用链接有效") |
| | | |
| | | return 0 if has_today_journal else 1 |
| | | # 返回状态码 |
| | | if needs_update: |
| | | return 2 # 需要补充记录 |
| | | elif not has_today_journal: |
| | | return 1 # 无L2记录 |
| | | else: |
| | | return 0 # 一切正常 |
| | | |
| | | |
| | | if __name__ == "__main__": |