#!/usr/bin/env python3 """ 每周维护脚本 周一早上9:30执行,负责: 1. 运行memory-merger整理L2→L1 2. 检查L0大小 3. 生成周报 4. 发送报告(可选) """ import os import sys import subprocess from datetime import datetime, timedelta from pathlib import Path def get_workspace_path() -> Path: """获取workspace路径。""" return Path.home() / ".openclaw" / "workspace" def run_memory_merger() -> tuple: """运行memory-merger技能。""" workspace = get_workspace_path() merger_path = workspace / ".agents" / "skills" / "memory-merger" if not merger_path.exists(): return False, "memory-merger技能未安装" # 运行memory-merger try: result = subprocess.run( ["python", str(merger_path / "scripts" / "merge.py"), "memory-management"], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: return True, result.stdout else: return False, result.stderr except Exception as e: return False, str(e) def check_l0_size() -> dict: """检查L0状态。""" workspace = get_workspace_path() memory_file = workspace / "MEMORY.md" if not memory_file.exists(): return {"exists": False, "size": 0, "status": "missing"} size = memory_file.stat().st_size kb = size / 1024 if size > 4096: status = "over_limit" elif size > 3500: status = "warning" else: status = "ok" return { "exists": True, "size": size, "size_kb": kb, "status": status } def count_journal_files() -> int: """统计本周L2文件数量。""" workspace = get_workspace_path() journal_dir = workspace / "memory" / "journal" if not journal_dir.exists(): return 0 # 获取本周日期范围 today = datetime.now() start_of_week = today - timedelta(days=today.weekday()) count = 0 for f in journal_dir.glob("*.md"): try: file_date = datetime.strptime(f.stem, "%Y-%m-%d") if start_of_week <= file_date <= today: count += 1 except ValueError: continue return count def count_milestone_files() -> int: """统计L1里程碑文件数量。""" workspace = get_workspace_path() milestones_dir = workspace / "memory" / "milestones" if not milestones_dir.exists(): return 0 return len(list(milestones_dir.glob("*.md"))) def generate_report() -> str: """生成周报内容。""" today_str = datetime.now().strftime("%Y-%m-%d") week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime("%Y-%m-%d") report = [] report.append("# 📊 记忆管理周报") report.append(f"**周期**: {week_start} ~ {today_str}") report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}") report.append("") report.append("---") report.append("") # L0状态 l0_status = check_l0_size() report.append("## 📋 L0层 (MEMORY.md)") if l0_status["exists"]: report.append(f"- **大小**: {l0_status['size_kb']:.1f}KB / 4KB") if l0_status["status"] == "ok": report.append("- **状态**: ✅ 正常") elif l0_status["status"] == "warning": report.append("- **状态**: ⚠️ 接近限制,建议归档") else: report.append("- **状态**: 🚨 超过红线,需要立即归档") else: report.append("- **状态**: ❌ 文件不存在") report.append("") # L2统计 journal_count = count_journal_files() report.append("## 📝 L2层 (Journal)") report.append(f"- **本周新增**: {journal_count} 条记录") report.append("") # L1统计 milestone_count = count_milestone_files() report.append("## 🗂️ L1层 (Milestones)") report.append(f"- **里程碑总数**: {milestone_count} 个主题") report.append("") # 维护任务 report.append("## 🔧 本周维护任务") # 尝试运行memory-merger success, output = run_memory_merger() if success: report.append("- ✅ L2→L1合并完成") if output.strip(): report.append(f"- 📄 合并详情:\n```\n{output}\n```") else: report.append(f"- ❌ L2→L1合并失败: {output}") if l0_status["status"] in ["warning", "over_limit"]: report.append("- ⚠️ L0层需要归档整理") report.append("") report.append("---") report.append("") report.append("*由memory-management技能自动生成*") return "\n".join(report) def main(): """主函数。""" import argparse parser = argparse.ArgumentParser(description="三层记忆每周维护") parser.add_argument("--send-report", action="store_true", help="发送报告到飞书") parser.add_argument("--output", type=str, help="报告输出文件路径") args = parser.parse_args() print("🔄 开始执行每周维护...") print("=" * 50) # 生成报告 report = generate_report() # 输出到文件 if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(report) print(f"✅ 报告已保存到: {args.output}") # 打印报告 print("\n" + report) # 发送到飞书(如果需要) if args.send_report: print("\n📤 发送到飞书...") # 这里会调用message工具,但在脚本中我们通过stdout返回 print(report) print("\n" + "=" * 50) print("✅ 每周维护完成") return 0 if __name__ == "__main__": sys.exit(main())