#!/usr/bin/env python3
|
"""
|
每周维护脚本
|
周一早上9:30执行,负责:
|
1. 自动合并L2→L1(基于内容筛选策略)
|
2. 检查L0大小
|
3. 生成周报
|
4. 发送报告(可选)
|
"""
|
|
import os
|
import sys
|
import subprocess
|
import re
|
from datetime import datetime, timedelta
|
from pathlib import Path
|
|
|
def get_workspace_path() -> Path:
|
"""获取workspace路径。"""
|
return Path.home() / ".openclaw" / "workspace"
|
|
|
def should_merge_to_l1(content: str) -> tuple:
|
"""
|
判断L2内容是否应该合并到L1
|
返回: (是否应该合并, 原因/类别)
|
"""
|
content_lower = content.lower()
|
|
# 策略1: 关键词匹配 - 重要决策类
|
decision_keywords = [
|
"决策", "决定", "结论", "方案", "选择",
|
"采用", "确定", "最终", "resolved", "解决",
|
"关键", "重要", "核心", "原则"
|
]
|
|
# 策略2: 技术方案类
|
tech_keywords = [
|
"架构", "设计", "实现", "配置", "优化",
|
"部署", "迁移", "升级", "重构", "方案"
|
]
|
|
# 策略3: 经验教训类
|
lesson_keywords = [
|
"教训", "经验", "学习", "注意", "避免",
|
"问题", "bug", "错误", "失败原因"
|
]
|
|
# 策略4: 流程规范类
|
process_keywords = [
|
"流程", "规范", "规则", "约定", "标准",
|
"红线", "必须", "禁止", "要求"
|
]
|
|
# 检查匹配
|
matched_keywords = []
|
category = None
|
|
for kw in decision_keywords:
|
if kw in content_lower:
|
matched_keywords.append(kw)
|
category = "决策记录"
|
break
|
|
if not category:
|
for kw in tech_keywords:
|
if kw in content_lower:
|
matched_keywords.append(kw)
|
category = "技术方案"
|
break
|
|
if not category:
|
for kw in lesson_keywords:
|
if kw in content_lower:
|
matched_keywords.append(kw)
|
category = "经验教训"
|
break
|
|
if not category:
|
for kw in process_keywords:
|
if kw in content_lower:
|
matched_keywords.append(kw)
|
category = "流程规范"
|
break
|
|
# 策略5: 内容长度检查(太短的内容不值得合并)
|
if len(content) < 200:
|
return False, "内容过短"
|
|
# 策略6: 必须有结构化标记(有###标题的才算正式记录)
|
has_structure = bool(re.search(r'###\s+', content))
|
if not has_structure:
|
return False, "缺乏结构化标记"
|
|
if category:
|
return True, category
|
|
return False, "未匹配合并策略"
|
|
|
def is_duplicate_in_l1(content: str, l1_file: Path) -> bool:
|
"""检查内容是否已在L1中存在(简单去重)"""
|
if not l1_file.exists():
|
return False
|
|
try:
|
l1_content = l1_file.read_text(encoding='utf-8')
|
# 提取内容前100字作为指纹
|
content_fingerprint = content[:100].strip()
|
# 检查L1中是否已有相似内容
|
return content_fingerprint in l1_content
|
except Exception:
|
return False
|
|
|
def auto_merge_l2_to_l1() -> dict:
|
"""
|
自动合并本周L2到L1
|
返回合并统计信息
|
"""
|
workspace = get_workspace_path()
|
journal_dir = workspace / "memory" / "journal"
|
milestones_dir = workspace / "memory" / "milestones"
|
|
if not journal_dir.exists():
|
return {"status": "no_journal", "merged": 0, "skipped": 0, "details": []}
|
|
# 确保milestones目录存在
|
milestones_dir.mkdir(parents=True, exist_ok=True)
|
|
# 获取本周日期范围
|
today = datetime.now()
|
start_of_week = today - timedelta(days=today.weekday())
|
|
# 本周的L1文件
|
current_month = today.strftime("%Y-%m")
|
l1_file = milestones_dir / f"{current_month}-weekly.md"
|
|
merged_count = 0
|
skipped_count = 0
|
details = []
|
|
# 遍历本周L2文件
|
for l2_file in sorted(journal_dir.glob("*.md")):
|
try:
|
file_date = datetime.strptime(l2_file.stem, "%Y-%m-%d")
|
if not (start_of_week <= file_date <= today):
|
continue
|
|
# 读取L2内容
|
l2_content = l2_file.read_text(encoding='utf-8')
|
|
# 按事件分割(按##标题分割)
|
events = re.split(r'\n##\s+\[', l2_content)
|
|
for event in events[1:]: # 第一个通常是文件头
|
event = "## [" + event # 恢复标题标记
|
|
# 提取事件标题
|
title_match = re.search(r'##\s*\[.*?\]\s*(.+?)\n', event)
|
title = title_match.group(1).strip() if title_match else "未命名事件"
|
|
# 判断是否应该合并
|
should_merge, reason = should_merge_to_l1(event)
|
|
if not should_merge:
|
skipped_count += 1
|
details.append({
|
"date": l2_file.stem,
|
"title": title,
|
"action": "跳过",
|
"reason": reason
|
})
|
continue
|
|
# 检查是否重复
|
if is_duplicate_in_l1(event, l1_file):
|
skipped_count += 1
|
details.append({
|
"date": l2_file.stem,
|
"title": title,
|
"action": "跳过",
|
"reason": "L1中已存在"
|
})
|
continue
|
|
# 执行合并
|
try:
|
# 准备L1格式内容
|
l1_entry = f"""
|
## [{l2_file.stem}] {title}
|
**类别**: {reason}
|
**来源**: [L2详情](./journal/{l2_file.name})
|
|
### 摘要
|
{event[:500]}...
|
|
---
|
"""
|
|
# 追加到L1文件
|
with open(l1_file, 'a', encoding='utf-8') as f:
|
f.write(l1_entry)
|
|
merged_count += 1
|
details.append({
|
"date": l2_file.stem,
|
"title": title,
|
"action": "已合并",
|
"category": reason
|
})
|
|
except Exception as e:
|
details.append({
|
"date": l2_file.stem,
|
"title": title,
|
"action": "失败",
|
"reason": str(e)
|
})
|
|
except ValueError:
|
continue # 文件名格式不正确
|
except Exception as e:
|
details.append({
|
"date": l2_file.stem if 'l2_file' in locals() else "unknown",
|
"title": "读取失败",
|
"action": "错误",
|
"reason": str(e)
|
})
|
|
return {
|
"status": "success" if merged_count > 0 else "no_merge",
|
"merged": merged_count,
|
"skipped": skipped_count,
|
"l1_file": str(l1_file) if merged_count > 0 else None,
|
"details": details
|
}
|
|
|
def run_memory_merger() -> tuple:
|
"""运行自动合并(替代原memory-merger调用)"""
|
result = auto_merge_l2_to_l1()
|
|
if result["status"] == "no_journal":
|
return False, "未找到journal目录"
|
|
if result["status"] == "success":
|
summary = f"✅ 合并完成: {result['merged']} 条 → {result['l1_file']}\n"
|
summary += f"⏭️ 跳过: {result['skipped']} 条\n"
|
summary += "\n详细记录:\n"
|
for d in result['details'][-5:]: # 只显示最后5条
|
summary += f" - [{d['date']}] {d['title']}: {d['action']}"
|
if 'reason' in d:
|
summary += f" ({d['reason']})"
|
summary += "\n"
|
return True, summary
|
else:
|
return False, f"未找到可合并内容(本周共扫描 {result['skipped']} 条,均不符合合并条件)"
|
|
|
def check_l0_size() -> dict:
|
"""检查L0状态。"""
|
workspace = get_workspace_path()
|
memory_file = workspace / "MEMORY.md"
|
|
if not memory_file.exists():
|
return {"exists": False, "size": 0, "status": "missing"}
|
|
size = memory_file.stat().st_size
|
kb = size / 1024
|
|
if size > 4096:
|
status = "over_limit"
|
elif size > 3500:
|
status = "warning"
|
else:
|
status = "ok"
|
|
return {
|
"exists": True,
|
"size": size,
|
"size_kb": kb,
|
"status": status
|
}
|
|
|
def count_journal_files() -> int:
|
"""统计本周L2文件数量。"""
|
workspace = get_workspace_path()
|
journal_dir = workspace / "memory" / "journal"
|
|
if not journal_dir.exists():
|
return 0
|
|
# 获取本周日期范围
|
today = datetime.now()
|
start_of_week = today - timedelta(days=today.weekday())
|
|
count = 0
|
for f in journal_dir.glob("*.md"):
|
try:
|
file_date = datetime.strptime(f.stem, "%Y-%m-%d")
|
if start_of_week <= file_date <= today:
|
count += 1
|
except ValueError:
|
continue
|
|
return count
|
|
|
def count_milestone_files() -> int:
|
"""统计L1里程碑文件数量。"""
|
workspace = get_workspace_path()
|
milestones_dir = workspace / "memory" / "milestones"
|
|
if not milestones_dir.exists():
|
return 0
|
|
return len(list(milestones_dir.glob("*.md")))
|
|
|
def generate_report() -> str:
|
"""生成周报内容。"""
|
today_str = datetime.now().strftime("%Y-%m-%d")
|
week_start = (datetime.now() - timedelta(days=datetime.now().weekday())).strftime("%Y-%m-%d")
|
|
report = []
|
report.append("# 📊 记忆管理周报")
|
report.append(f"**周期**: {week_start} ~ {today_str}")
|
report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
report.append("")
|
report.append("---")
|
report.append("")
|
|
# L0状态
|
l0_status = check_l0_size()
|
report.append("## 📋 L0层 (MEMORY.md)")
|
if l0_status["exists"]:
|
report.append(f"- **大小**: {l0_status['size_kb']:.1f}KB / 4KB")
|
if l0_status["status"] == "ok":
|
report.append("- **状态**: ✅ 正常")
|
elif l0_status["status"] == "warning":
|
report.append("- **状态**: ⚠️ 接近限制,建议归档")
|
else:
|
report.append("- **状态**: 🚨 超过红线,需要立即归档")
|
else:
|
report.append("- **状态**: ❌ 文件不存在")
|
report.append("")
|
|
# L2统计
|
journal_count = count_journal_files()
|
report.append("## 📝 L2层 (Journal)")
|
report.append(f"- **本周新增**: {journal_count} 条记录")
|
report.append("")
|
|
# L1统计
|
milestone_count = count_milestone_files()
|
report.append("## 🗂️ L1层 (Milestones)")
|
report.append(f"- **里程碑总数**: {milestone_count} 个主题")
|
report.append("")
|
|
# 维护任务
|
report.append("## 🔧 本周维护任务")
|
|
# 尝试运行memory-merger
|
success, output = run_memory_merger()
|
if success:
|
report.append("- ✅ L2→L1合并完成")
|
if output.strip():
|
report.append(f"- 📄 合并详情:\n```\n{output}\n```")
|
else:
|
report.append(f"- ❌ L2→L1合并失败: {output}")
|
|
if l0_status["status"] in ["warning", "over_limit"]:
|
report.append("- ⚠️ L0层需要归档整理")
|
|
report.append("")
|
report.append("---")
|
report.append("")
|
report.append("*由memory-management技能自动生成*")
|
|
return "\n".join(report)
|
|
|
def main():
|
"""主函数。"""
|
import argparse
|
parser = argparse.ArgumentParser(description="三层记忆每周维护")
|
parser.add_argument("--send-report", action="store_true", help="发送报告到飞书")
|
parser.add_argument("--output", type=str, help="报告输出文件路径")
|
args = parser.parse_args()
|
|
print("🔄 开始执行每周维护...")
|
print("=" * 50)
|
|
# 生成报告
|
report = generate_report()
|
|
# 输出到文件
|
if args.output:
|
with open(args.output, 'w', encoding='utf-8') as f:
|
f.write(report)
|
print(f"✅ 报告已保存到: {args.output}")
|
|
# 打印报告
|
print("\n" + report)
|
|
# 发送到飞书(如果需要)
|
if args.send_report:
|
print("\n📤 发送到飞书...")
|
# 这里会调用message工具,但在脚本中我们通过stdout返回
|
print(report)
|
|
print("\n" + "=" * 50)
|
print("✅ 每周维护完成")
|
|
return 0
|
|
|
if __name__ == "__main__":
|
sys.exit(main())
|