#!/usr/bin/env python3
|
"""
|
每日记忆检查脚本
|
在晚上10点后触发,检查今日是否已写入L2,并扫描session确保无遗漏
|
"""
|
|
import os
|
import sys
|
import json
|
import re
|
from datetime import datetime
|
from pathlib import Path
|
from typing import List, Dict, Optional, Tuple
|
|
|
def get_workspace_path() -> Path:
|
"""获取workspace路径。"""
|
return Path.home() / ".openclaw" / "workspace"
|
|
|
def get_sessions_path() -> Path:
|
"""获取sessions路径。"""
|
return Path.home() / ".openclaw" / "agents" / "main" / "sessions"
|
|
|
def check_today_journal() -> bool:
|
"""检查今日是否已有L2记录。"""
|
workspace = get_workspace_path()
|
today = datetime.now().strftime("%Y-%m-%d")
|
journal_file = workspace / "memory" / "journal" / f"{today}.md"
|
return journal_file.exists()
|
|
|
def get_l0_size() -> int:
|
"""获取MEMORY.md文件大小(字节)。"""
|
workspace = get_workspace_path()
|
memory_file = workspace / "MEMORY.md"
|
if memory_file.exists():
|
return memory_file.stat().st_size
|
return 0
|
|
|
def format_size(size_bytes: int) -> str:
|
"""格式化文件大小显示。"""
|
kb = size_bytes / 1024
|
return f"{kb:.1f}KB"
|
|
|
def get_today_session_files() -> List[Path]:
|
"""
|
获取今日所有session文件(包括.reset.和.deleted.归档)
|
这是确保"没有遗漏"的关键步骤
|
"""
|
sessions_dir = get_sessions_path()
|
if not sessions_dir.exists():
|
return []
|
|
today = datetime.now()
|
today_files = []
|
|
# 扫描所有.jsonl文件(包括.reset.和.deleted.)
|
for file in sessions_dir.glob("*.jsonl*"):
|
try:
|
# 检查文件修改时间
|
mtime = datetime.fromtimestamp(file.stat().st_mtime)
|
if mtime.date() == today.date():
|
today_files.append(file)
|
except (OSError, ValueError):
|
continue
|
|
# 按修改时间排序
|
today_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
|
return today_files
|
|
|
def extract_feishu_messages(file_path: Path, max_messages: int = 50) -> List[Dict]:
|
"""
|
从session文件中提取飞书渠道的消息
|
返回用户发送的消息列表
|
"""
|
messages = []
|
|
try:
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
for line_num, line in enumerate(f):
|
if line_num >= max_messages * 3: # 限制读取行数
|
break
|
|
line = line.strip()
|
if not line:
|
continue
|
|
try:
|
record = json.loads(line)
|
|
# 只处理消息类型
|
if record.get("type") != "message":
|
continue
|
|
msg = record.get("message", {})
|
if not msg:
|
continue
|
|
# 检查是否是用户消息(role为user)
|
if msg.get("role") != "user":
|
continue
|
|
# 提取内容
|
content_list = msg.get("content", [])
|
if not content_list:
|
continue
|
|
# 查找文本内容
|
text_content = ""
|
for item in content_list:
|
if isinstance(item, dict) and item.get("type") == "text":
|
text = item.get("text", "")
|
# 过滤掉系统消息
|
if text and not text.startswith("[") and len(text) > 10:
|
text_content = text
|
break
|
|
if text_content:
|
messages.append({
|
"timestamp": record.get("timestamp", ""),
|
"content": text_content[:200] # 限制长度
|
})
|
|
if len(messages) >= max_messages:
|
break
|
|
except json.JSONDecodeError:
|
continue
|
|
except (IOError, OSError) as e:
|
print(f" 警告:无法读取文件 {file_path.name}: {e}")
|
|
return messages
|
|
|
def analyze_sessions_for_events() -> Tuple[bool, List[str]]:
|
"""
|
分析今日session,检查是否有重要事件需要记录
|
返回:(是否需要补充记录, 事件列表)
|
"""
|
print("\n🔍 扫描今日session文件(检查是否遗漏):")
|
|
session_files = get_today_session_files()
|
|
if not session_files:
|
print(" ⚠️ 未找到今日session文件")
|
return False, []
|
|
print(f" 找到 {len(session_files)} 个session文件:")
|
for f in session_files:
|
mtime = datetime.fromtimestamp(f.stat().st_mtime)
|
print(f" - {f.name} ({mtime.strftime('%H:%M')})")
|
|
# 关键词列表,用于识别重要事件
|
important_keywords = [
|
"安装", "创建", "配置", "定时任务", "cron", "技能", "skill",
|
"早报", "更新", "修改", "决策", "设定"
|
]
|
|
found_events = []
|
total_user_messages = 0
|
|
for file_path in session_files:
|
messages = extract_feishu_messages(file_path, max_messages=20)
|
total_user_messages += len(messages)
|
|
for msg in messages:
|
content = msg["content"]
|
# 检查是否包含重要事件关键词
|
for keyword in important_keywords:
|
if keyword in content and len(content) > 20:
|
event_summary = content[:100] + "..." if len(content) > 100 else content
|
if event_summary not in found_events:
|
found_events.append(event_summary)
|
break
|
|
print(f"\n 提取到 {total_user_messages} 条用户消息")
|
|
if found_events:
|
print(f" 识别到 {len(found_events)} 个可能的重要事件:")
|
for i, event in enumerate(found_events[:5], 1): # 只显示前5个
|
print(f" {i}. {event}")
|
|
# 判断是否需要补充记录
|
needs_update = len(found_events) >= 2 and not check_today_journal()
|
|
return needs_update, found_events
|
|
|
def main():
|
"""主函数。"""
|
today_str = datetime.now().strftime("%Y-%m-%d")
|
print(f"📅 日期检查: {today_str}")
|
print("=" * 50)
|
|
# 检查今日L2
|
has_today_journal = check_today_journal()
|
print(f"\n📝 L2记录检查:")
|
if has_today_journal:
|
print(" ✅ 今日已有journal记录")
|
else:
|
print(" ⚠️ 今日尚未创建journal记录")
|
|
# 关键步骤:扫描session文件确保无遗漏
|
needs_update, events = analyze_sessions_for_events()
|
|
if needs_update:
|
print(f"\n🚨 发现遗漏:今日有session活动但未写入L2")
|
print(f" 识别到 {len(events)} 个事件需要记录")
|
print(" 建议:执行 '检查今天的session并生成总结'")
|
elif has_today_journal:
|
print("\n ✅ 已记录L2,session扫描完成")
|
else:
|
print("\n ⚠️ 今日无重要活动或已记录完毕")
|
|
# 检查L0大小
|
l0_size = get_l0_size()
|
print(f"\n📊 L0 (MEMORY.md) 大小检查:")
|
print(f" 当前: {format_size(l0_size)} / 4KB")
|
|
if l0_size > 4096:
|
print(" 🚨 警告:超过4KB红线!需要立即归档到L1")
|
elif l0_size > 3500:
|
print(" ⚠️ 提醒:接近4KB限制,建议准备归档")
|
else:
|
print(" ✅ 大小正常")
|
|
print("\n" + "=" * 50)
|
print("📋 每日维护清单:")
|
if has_today_journal:
|
print(" [x] L2记录已存在")
|
else:
|
print(" [ ] 如有重要事件,写入今日L2")
|
|
if session_files := get_today_session_files():
|
print(f" [x] 已扫描 {len(session_files)} 个session文件")
|
else:
|
print(" [-] 今日无session活动")
|
|
print(" [ ] 检查MEMORY.md最近活动摘要")
|
if l0_size > 3500:
|
print(" [ ] L0接近限制,考虑归档到L1")
|
print(" [ ] 确认L0层引用链接有效")
|
|
# 返回状态码
|
if needs_update:
|
return 2 # 需要补充记录
|
elif not has_today_journal:
|
return 1 # 无L2记录
|
else:
|
return 0 # 一切正常
|
|
|
if __name__ == "__main__":
|
sys.exit(main())
|