#!/usr/bin/env python3 """ 每周维护脚本 周一早上9:30执行,负责: 1. 自动合并L2→L1(基于内容筛选策略) 2. 检查L0大小 3. 生成周报 4. 发送报告(可选) """ import os import sys import subprocess import re from datetime import datetime, timedelta from pathlib import Path def get_workspace_path() -> Path: """获取workspace路径。""" return Path.home() / ".openclaw" / "workspace" def should_merge_to_l1(content: str) -> tuple: """ 判断L2内容是否应该合并到L1 返回: (是否应该合并, 原因/类别) """ content_lower = content.lower() # 策略1: 关键词匹配 - 重要决策类 decision_keywords = [ "决策", "决定", "结论", "方案", "选择", "采用", "确定", "最终", "resolved", "解决", "关键", "重要", "核心", "原则" ] # 策略2: 技术方案类 tech_keywords = [ "架构", "设计", "实现", "配置", "优化", "部署", "迁移", "升级", "重构", "方案" ] # 策略3: 经验教训类 lesson_keywords = [ "教训", "经验", "学习", "注意", "避免", "问题", "bug", "错误", "失败原因" ] # 策略4: 流程规范类 process_keywords = [ "流程", "规范", "规则", "约定", "标准", "红线", "必须", "禁止", "要求" ] # 检查匹配 matched_keywords = [] category = None for kw in decision_keywords: if kw in content_lower: matched_keywords.append(kw) category = "决策记录" break if not category: for kw in tech_keywords: if kw in content_lower: matched_keywords.append(kw) category = "技术方案" break if not category: for kw in lesson_keywords: if kw in content_lower: matched_keywords.append(kw) category = "经验教训" break if not category: for kw in process_keywords: if kw in content_lower: matched_keywords.append(kw) category = "流程规范" break # 策略5: 内容长度检查(太短的内容不值得合并) if len(content) < 200: return False, "内容过短" # 策略6: 必须有结构化标记(有###标题的才算正式记录) has_structure = bool(re.search(r'###\s+', content)) if not has_structure: return False, "缺乏结构化标记" if category: return True, category return False, "未匹配合并策略" def is_duplicate_in_l1(content: str, l1_file: Path) -> bool: """检查内容是否已在L1中存在(简单去重)""" if not l1_file.exists(): return False try: l1_content = l1_file.read_text(encoding='utf-8') # 提取内容前100字作为指纹 content_fingerprint = content[:100].strip() # 检查L1中是否已有相似内容 return content_fingerprint in l1_content except Exception: return False def auto_merge_l2_to_l1() -> dict: """ 自动合并本周L2到L1 返回合并统计信息 """ workspace = get_workspace_path() journal_dir = workspace / "memory" / "journal" milestones_dir = workspace / "memory" / "milestones" if not journal_dir.exists(): return {"status": "no_journal", "merged": 0, "skipped": 0, "details": []} # 确保milestones目录存在 milestones_dir.mkdir(parents=True, exist_ok=True) # 获取本周日期范围 today = datetime.now() start_of_week = today - timedelta(days=today.weekday()) # 本周的L1文件 current_month = today.strftime("%Y-%m") l1_file = milestones_dir / f"{current_month}-weekly.md" merged_count = 0 skipped_count = 0 details = [] # 遍历本周L2文件 for l2_file in sorted(journal_dir.glob("*.md")): try: file_date = datetime.strptime(l2_file.stem, "%Y-%m-%d") if not (start_of_week <= file_date <= today): continue # 读取L2内容 l2_content = l2_file.read_text(encoding='utf-8') # 按事件分割(按##标题分割) events = re.split(r'\n##\s+\[', l2_content) for event in events[1:]: # 第一个通常是文件头 event = "## [" + event # 恢复标题标记 # 提取事件标题 title_match = re.search(r'##\s*\[.*?\]\s*(.+?)\n', event) title = title_match.group(1).strip() if title_match else "未命名事件" # 判断是否应该合并 should_merge, reason = should_merge_to_l1(event) if not should_merge: skipped_count += 1 details.append({ "date": l2_file.stem, "title": title, "action": "跳过", "reason": reason }) continue # 检查是否重复 if is_duplicate_in_l1(event, l1_file): skipped_count += 1 details.append({ "date": l2_file.stem, "title": title, "action": "跳过", "reason": "L1中已存在" }) continue # 执行合并 try: # 准备L1格式内容 l1_entry = f""" ## [{l2_file.stem}] {title} **类别**: {reason} **来源**: [L2详情](./journal/{l2_file.name}) ### 摘要 {event[:500]}... --- """ # 追加到L1文件 with open(l1_file, 'a', encoding='utf-8') as f: f.write(l1_entry) merged_count += 1 details.append({ "date": l2_file.stem, "title": title, "action": "已合并", "category": reason }) except Exception as e: details.append({ "date": l2_file.stem, "title": title, "action": "失败", "reason": str(e) }) except ValueError: continue # 文件名格式不正确 except Exception as e: details.append({ "date": l2_file.stem if 'l2_file' in locals() else "unknown", "title": "读取失败", "action": "错误", "reason": str(e) }) return { "status": "success" if merged_count > 0 else "no_merge", "merged": merged_count, "skipped": skipped_count, "l1_file": str(l1_file) if merged_count > 0 else None, "details": details } def run_memory_merger() -> tuple: """运行自动合并(替代原memory-merger调用)""" result = auto_merge_l2_to_l1() if result["status"] == "no_journal": return False, "未找到journal目录" if result["status"] == "success": summary = f"✅ 合并完成: {result['merged']} 条 → {result['l1_file']}\n" summary += f"⏭️ 跳过: {result['skipped']} 条\n" summary += "\n详细记录:\n" for d in result['details'][-5:]: # 只显示最后5条 summary += f" - [{d['date']}] {d['title']}: {d['action']}" if 'reason' in d: summary += f" ({d['reason']})" summary += "\n" return True, summary else: return False, f"未找到可合并内容(本周共扫描 {result['skipped']} 条,均不符合合并条件)" def check_l0_size() -> dict: """检查L0状态。""" workspace = get_workspace_path() memory_file = workspace / "MEMORY.md" if not memory_file.exists(): return {"exists": False, "size": 0, "status": "missing"} size = memory_file.stat().st_size kb = size / 1024 if size > 4096: status = "over_limit" elif size > 3500: status = "warning" else: status = "ok" return { "exists": True, "size": size, "size_kb": kb, "status": status } def count_journal_files() -> int: """统计本周L2文件数量。""" workspace = get_workspace_path() journal_dir = workspace / "memory" / "journal" if not journal_dir.exists(): return 0 # 获取本周日期范围 today = datetime.now() start_of_week = today - timedelta(days=today.weekday()) count = 0 for f in journal_dir.glob("*.md"): try: file_date = datetime.strptime(f.stem, "%Y-%m-%d") if start_of_week <= file_date <= today: count += 1 except ValueError: continue return count def count_milestone_files() -> int: """统计L1里程碑文件数量。""" workspace = get_workspace_path() milestones_dir = workspace / "memory" / "milestones" if not milestones_dir.exists(): return 0 return len(list(milestones_dir.glob("*.md"))) def generate_report() -> str: """生成周报内容。""" today_str = datetime.now().strftime("%Y-%m-%d") week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime("%Y-%m-%d") report = [] report.append("# 📊 记忆管理周报") report.append(f"**周期**: {week_start} ~ {today_str}") report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}") report.append("") report.append("---") report.append("") # L0状态 l0_status = check_l0_size() report.append("## 📋 L0层 (MEMORY.md)") if l0_status["exists"]: report.append(f"- **大小**: {l0_status['size_kb']:.1f}KB / 4KB") if l0_status["status"] == "ok": report.append("- **状态**: ✅ 正常") elif l0_status["status"] == "warning": report.append("- **状态**: ⚠️ 接近限制,建议归档") else: report.append("- **状态**: 🚨 超过红线,需要立即归档") else: report.append("- **状态**: ❌ 文件不存在") report.append("") # L2统计 journal_count = count_journal_files() report.append("## 📝 L2层 (Journal)") report.append(f"- **本周新增**: {journal_count} 条记录") report.append("") # L1统计 milestone_count = count_milestone_files() report.append("## 🗂️ L1层 (Milestones)") report.append(f"- **里程碑总数**: {milestone_count} 个主题") report.append("") # 维护任务 report.append("## 🔧 本周维护任务") # 尝试运行memory-merger success, output = run_memory_merger() if success: report.append("- ✅ L2→L1合并完成") if output.strip(): report.append(f"- 📄 合并详情:\n```\n{output}\n```") else: report.append(f"- ❌ L2→L1合并失败: {output}") if l0_status["status"] in ["warning", "over_limit"]: report.append("- ⚠️ L0层需要归档整理") report.append("") report.append("---") report.append("") report.append("*由memory-management技能自动生成*") return "\n".join(report) def main(): """主函数。""" import argparse parser = argparse.ArgumentParser(description="三层记忆每周维护") parser.add_argument("--send-report", action="store_true", help="发送报告到飞书") parser.add_argument("--output", type=str, help="报告输出文件路径") args = parser.parse_args() print("🔄 开始执行每周维护...") print("=" * 50) # 生成报告 report = generate_report() # 输出到文件 if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(report) print(f"✅ 报告已保存到: {args.output}") # 打印报告 print("\n" + report) # 发送到飞书(如果需要) if args.send_report: print("\n📤 发送到飞书...") # 这里会调用message工具,但在脚本中我们通过stdout返回 print(report) print("\n" + "=" * 50) print("✅ 每周维护完成") return 0 if __name__ == "__main__": sys.exit(main())