#!/usr/bin/env python3
|
"""
|
每周维护脚本
|
周一早上9:30执行,负责:
|
1. 运行memory-merger整理L2→L1
|
2. 检查L0大小
|
3. 生成周报
|
4. 发送报告(可选)
|
"""
|
|
import os
|
import sys
|
import subprocess
|
from datetime import datetime, timedelta
|
from pathlib import Path
|
|
|
def get_workspace_path() -> Path:
|
"""获取workspace路径。"""
|
return Path.home() / ".openclaw" / "workspace"
|
|
|
def run_memory_merger() -> tuple:
|
"""运行memory-merger技能。"""
|
workspace = get_workspace_path()
|
merger_path = workspace / ".agents" / "skills" / "memory-merger"
|
|
if not merger_path.exists():
|
return False, "memory-merger技能未安装"
|
|
# 运行memory-merger
|
try:
|
result = subprocess.run(
|
["python", str(merger_path / "scripts" / "merge.py"), "memory-management"],
|
capture_output=True,
|
text=True,
|
timeout=60
|
)
|
if result.returncode == 0:
|
return True, result.stdout
|
else:
|
return False, result.stderr
|
except Exception as e:
|
return False, str(e)
|
|
|
def check_l0_size() -> dict:
|
"""检查L0状态。"""
|
workspace = get_workspace_path()
|
memory_file = workspace / "MEMORY.md"
|
|
if not memory_file.exists():
|
return {"exists": False, "size": 0, "status": "missing"}
|
|
size = memory_file.stat().st_size
|
kb = size / 1024
|
|
if size > 4096:
|
status = "over_limit"
|
elif size > 3500:
|
status = "warning"
|
else:
|
status = "ok"
|
|
return {
|
"exists": True,
|
"size": size,
|
"size_kb": kb,
|
"status": status
|
}
|
|
|
def count_journal_files() -> int:
|
"""统计本周L2文件数量。"""
|
workspace = get_workspace_path()
|
journal_dir = workspace / "memory" / "journal"
|
|
if not journal_dir.exists():
|
return 0
|
|
# 获取本周日期范围
|
today = datetime.now()
|
start_of_week = today - timedelta(days=today.weekday())
|
|
count = 0
|
for f in journal_dir.glob("*.md"):
|
try:
|
file_date = datetime.strptime(f.stem, "%Y-%m-%d")
|
if start_of_week <= file_date <= today:
|
count += 1
|
except ValueError:
|
continue
|
|
return count
|
|
|
def count_milestone_files() -> int:
|
"""统计L1里程碑文件数量。"""
|
workspace = get_workspace_path()
|
milestones_dir = workspace / "memory" / "milestones"
|
|
if not milestones_dir.exists():
|
return 0
|
|
return len(list(milestones_dir.glob("*.md")))
|
|
|
def generate_report() -> str:
|
"""生成周报内容。"""
|
today_str = datetime.now().strftime("%Y-%m-%d")
|
week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime("%Y-%m-%d")
|
|
report = []
|
report.append("# 📊 记忆管理周报")
|
report.append(f"**周期**: {week_start} ~ {today_str}")
|
report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
report.append("")
|
report.append("---")
|
report.append("")
|
|
# L0状态
|
l0_status = check_l0_size()
|
report.append("## 📋 L0层 (MEMORY.md)")
|
if l0_status["exists"]:
|
report.append(f"- **大小**: {l0_status['size_kb']:.1f}KB / 4KB")
|
if l0_status["status"] == "ok":
|
report.append("- **状态**: ✅ 正常")
|
elif l0_status["status"] == "warning":
|
report.append("- **状态**: ⚠️ 接近限制,建议归档")
|
else:
|
report.append("- **状态**: 🚨 超过红线,需要立即归档")
|
else:
|
report.append("- **状态**: ❌ 文件不存在")
|
report.append("")
|
|
# L2统计
|
journal_count = count_journal_files()
|
report.append("## 📝 L2层 (Journal)")
|
report.append(f"- **本周新增**: {journal_count} 条记录")
|
report.append("")
|
|
# L1统计
|
milestone_count = count_milestone_files()
|
report.append("## 🗂️ L1层 (Milestones)")
|
report.append(f"- **里程碑总数**: {milestone_count} 个主题")
|
report.append("")
|
|
# 维护任务
|
report.append("## 🔧 本周维护任务")
|
|
# 尝试运行memory-merger
|
success, output = run_memory_merger()
|
if success:
|
report.append("- ✅ L2→L1合并完成")
|
if output.strip():
|
report.append(f"- 📄 合并详情:\n```\n{output}\n```")
|
else:
|
report.append(f"- ❌ L2→L1合并失败: {output}")
|
|
if l0_status["status"] in ["warning", "over_limit"]:
|
report.append("- ⚠️ L0层需要归档整理")
|
|
report.append("")
|
report.append("---")
|
report.append("")
|
report.append("*由memory-management技能自动生成*")
|
|
return "\n".join(report)
|
|
|
def main():
|
"""主函数。"""
|
import argparse
|
parser = argparse.ArgumentParser(description="三层记忆每周维护")
|
parser.add_argument("--send-report", action="store_true", help="发送报告到飞书")
|
parser.add_argument("--output", type=str, help="报告输出文件路径")
|
args = parser.parse_args()
|
|
print("🔄 开始执行每周维护...")
|
print("=" * 50)
|
|
# 生成报告
|
report = generate_report()
|
|
# 输出到文件
|
if args.output:
|
with open(args.output, 'w', encoding='utf-8') as f:
|
f.write(report)
|
print(f"✅ 报告已保存到: {args.output}")
|
|
# 打印报告
|
print("\n" + report)
|
|
# 发送到飞书(如果需要)
|
if args.send_report:
|
print("\n📤 发送到飞书...")
|
# 这里会调用message工具,但在脚本中我们通过stdout返回
|
print(report)
|
|
print("\n" + "=" * 50)
|
print("✅ 每周维护完成")
|
|
return 0
|
|
|
if __name__ == "__main__":
|
sys.exit(main())
|