#!/usr/bin/env python3 """ Ontology graph operations: create, query, relate, validate. Usage: python ontology.py create --type Person --props '{"name":"Alice"}' python ontology.py get --id p_001 python ontology.py query --type Task --where '{"status":"open"}' python ontology.py relate --from proj_001 --rel has_task --to task_001 python ontology.py related --id proj_001 --rel has_task python ontology.py list --type Person python ontology.py delete --id p_001 python ontology.py validate """ import argparse import json import uuid from datetime import datetime, timezone from pathlib import Path DEFAULT_GRAPH_PATH = "memory/ontology/graph.jsonl" DEFAULT_SCHEMA_PATH = "memory/ontology/schema.yaml" def resolve_safe_path( user_path: str, *, root: Path | None = None, must_exist: bool = False, label: str = "path", ) -> Path: """Resolve user path within root and reject traversal outside it.""" if not user_path or not user_path.strip(): raise SystemExit(f"Invalid {label}: empty path") safe_root = (root or Path.cwd()).resolve() candidate = Path(user_path).expanduser() if not candidate.is_absolute(): candidate = safe_root / candidate try: resolved = candidate.resolve(strict=False) except OSError as exc: raise SystemExit(f"Invalid {label}: {exc}") from exc try: resolved.relative_to(safe_root) except ValueError: raise SystemExit( f"Invalid {label}: must stay within workspace root '{safe_root}'" ) if must_exist and not resolved.exists(): raise SystemExit(f"Invalid {label}: file not found '{resolved}'") return resolved def generate_id(type_name: str) -> str: """Generate a unique ID for an entity.""" prefix = type_name.lower()[:4] suffix = uuid.uuid4().hex[:8] return f"{prefix}_{suffix}" def load_graph(path: str) -> tuple[dict, list]: """Load entities and relations from graph file.""" entities = {} relations = [] graph_path = Path(path) if not graph_path.exists(): return entities, relations with open(graph_path) as f: for line in f: line = line.strip() if not line: continue record = json.loads(line) op = record.get("op") if op == "create": entity = record["entity"] entities[entity["id"]] = entity elif op == "update": entity_id = record["id"] if entity_id in entities: entities[entity_id]["properties"].update(record.get("properties", {})) entities[entity_id]["updated"] = record.get("timestamp") elif op == "delete": entity_id = record["id"] entities.pop(entity_id, None) elif op == "relate": relations.append({ "from": record["from"], "rel": record["rel"], "to": record["to"], "properties": record.get("properties", {}) }) elif op == "unrelate": relations = [r for r in relations if not (r["from"] == record["from"] and r["rel"] == record["rel"] and r["to"] == record["to"])] return entities, relations def append_op(path: str, record: dict): """Append an operation to the graph file.""" graph_path = Path(path) graph_path.parent.mkdir(parents=True, exist_ok=True) with open(graph_path, "a") as f: f.write(json.dumps(record) + "\n") def create_entity(type_name: str, properties: dict, graph_path: str, entity_id: str = None) -> dict: """Create a new entity.""" entity_id = entity_id or generate_id(type_name) timestamp = datetime.now(timezone.utc).isoformat() entity = { "id": entity_id, "type": type_name, "properties": properties, "created": timestamp, "updated": timestamp } record = {"op": "create", "entity": entity, "timestamp": timestamp} append_op(graph_path, record) return entity def get_entity(entity_id: str, graph_path: str) -> dict | None: """Get entity by ID.""" entities, _ = load_graph(graph_path) return entities.get(entity_id) def query_entities(type_name: str, where: dict, graph_path: str) -> list: """Query entities by type and properties.""" entities, _ = load_graph(graph_path) results = [] for entity in entities.values(): if type_name and entity["type"] != type_name: continue match = True for key, value in where.items(): if entity["properties"].get(key) != value: match = False break if match: results.append(entity) return results def list_entities(type_name: str, graph_path: str) -> list: """List all entities of a type.""" entities, _ = load_graph(graph_path) if type_name: return [e for e in entities.values() if e["type"] == type_name] return list(entities.values()) def update_entity(entity_id: str, properties: dict, graph_path: str) -> dict | None: """Update entity properties.""" entities, _ = load_graph(graph_path) if entity_id not in entities: return None timestamp = datetime.now(timezone.utc).isoformat() record = {"op": "update", "id": entity_id, "properties": properties, "timestamp": timestamp} append_op(graph_path, record) entities[entity_id]["properties"].update(properties) entities[entity_id]["updated"] = timestamp return entities[entity_id] def delete_entity(entity_id: str, graph_path: str) -> bool: """Delete an entity.""" entities, _ = load_graph(graph_path) if entity_id not in entities: return False timestamp = datetime.now(timezone.utc).isoformat() record = {"op": "delete", "id": entity_id, "timestamp": timestamp} append_op(graph_path, record) return True def create_relation(from_id: str, rel_type: str, to_id: str, properties: dict, graph_path: str): """Create a relation between entities.""" timestamp = datetime.now(timezone.utc).isoformat() record = { "op": "relate", "from": from_id, "rel": rel_type, "to": to_id, "properties": properties, "timestamp": timestamp } append_op(graph_path, record) return record def get_related(entity_id: str, rel_type: str, graph_path: str, direction: str = "outgoing") -> list: """Get related entities.""" entities, relations = load_graph(graph_path) results = [] for rel in relations: if direction == "outgoing" and rel["from"] == entity_id: if not rel_type or rel["rel"] == rel_type: if rel["to"] in entities: results.append({ "relation": rel["rel"], "entity": entities[rel["to"]] }) elif direction == "incoming" and rel["to"] == entity_id: if not rel_type or rel["rel"] == rel_type: if rel["from"] in entities: results.append({ "relation": rel["rel"], "entity": entities[rel["from"]] }) elif direction == "both": if rel["from"] == entity_id or rel["to"] == entity_id: if not rel_type or rel["rel"] == rel_type: other_id = rel["to"] if rel["from"] == entity_id else rel["from"] if other_id in entities: results.append({ "relation": rel["rel"], "direction": "outgoing" if rel["from"] == entity_id else "incoming", "entity": entities[other_id] }) return results def validate_graph(graph_path: str, schema_path: str) -> list: """Validate graph against schema constraints.""" entities, relations = load_graph(graph_path) errors = [] # Load schema if exists schema = load_schema(schema_path) type_schemas = schema.get("types", {}) relation_schemas = schema.get("relations", {}) global_constraints = schema.get("constraints", []) for entity_id, entity in entities.items(): type_name = entity["type"] type_schema = type_schemas.get(type_name, {}) # Check required properties required = type_schema.get("required", []) for prop in required: if prop not in entity["properties"]: errors.append(f"{entity_id}: missing required property '{prop}'") # Check forbidden properties forbidden = type_schema.get("forbidden_properties", []) for prop in forbidden: if prop in entity["properties"]: errors.append(f"{entity_id}: contains forbidden property '{prop}'") # Check enum values for prop, allowed in type_schema.items(): if prop.endswith("_enum"): field = prop.replace("_enum", "") value = entity["properties"].get(field) if value and value not in allowed: errors.append(f"{entity_id}: '{field}' must be one of {allowed}, got '{value}'") # Relation constraints (type + cardinality + acyclicity) rel_index = {} for rel in relations: rel_index.setdefault(rel["rel"], []).append(rel) for rel_type, rel_schema in relation_schemas.items(): rels = rel_index.get(rel_type, []) from_types = rel_schema.get("from_types", []) to_types = rel_schema.get("to_types", []) cardinality = rel_schema.get("cardinality") acyclic = rel_schema.get("acyclic", False) # Type checks for rel in rels: from_entity = entities.get(rel["from"]) to_entity = entities.get(rel["to"]) if not from_entity or not to_entity: errors.append(f"{rel_type}: relation references missing entity ({rel['from']} -> {rel['to']})") continue if from_types and from_entity["type"] not in from_types: errors.append( f"{rel_type}: from entity {rel['from']} type {from_entity['type']} not in {from_types}" ) if to_types and to_entity["type"] not in to_types: errors.append( f"{rel_type}: to entity {rel['to']} type {to_entity['type']} not in {to_types}" ) # Cardinality checks if cardinality in ("one_to_one", "one_to_many", "many_to_one"): from_counts = {} to_counts = {} for rel in rels: from_counts[rel["from"]] = from_counts.get(rel["from"], 0) + 1 to_counts[rel["to"]] = to_counts.get(rel["to"], 0) + 1 if cardinality in ("one_to_one", "many_to_one"): for from_id, count in from_counts.items(): if count > 1: errors.append(f"{rel_type}: from entity {from_id} violates cardinality {cardinality}") if cardinality in ("one_to_one", "one_to_many"): for to_id, count in to_counts.items(): if count > 1: errors.append(f"{rel_type}: to entity {to_id} violates cardinality {cardinality}") # Acyclic checks if acyclic: graph = {} for rel in rels: graph.setdefault(rel["from"], []).append(rel["to"]) visited = {} def dfs(node, stack): visited[node] = True stack.add(node) for nxt in graph.get(node, []): if nxt in stack: return True if not visited.get(nxt, False): if dfs(nxt, stack): return True stack.remove(node) return False for node in graph: if not visited.get(node, False): if dfs(node, set()): errors.append(f"{rel_type}: cyclic dependency detected") break # Global constraints (limited enforcement) for constraint in global_constraints: ctype = constraint.get("type") relation = constraint.get("relation") rule = (constraint.get("rule") or "").strip().lower() if ctype == "Event" and "end" in rule and "start" in rule: for entity_id, entity in entities.items(): if entity["type"] != "Event": continue start = entity["properties"].get("start") end = entity["properties"].get("end") if start and end: try: start_dt = datetime.fromisoformat(start) end_dt = datetime.fromisoformat(end) if end_dt < start_dt: errors.append(f"{entity_id}: end must be >= start") except ValueError: errors.append(f"{entity_id}: invalid datetime format in start/end") if relation and rule == "acyclic": # Already enforced above via relations schema continue return errors def load_schema(schema_path: str) -> dict: """Load schema from YAML if it exists.""" schema = {} schema_file = Path(schema_path) if schema_file.exists(): import yaml with open(schema_file) as f: schema = yaml.safe_load(f) or {} return schema def write_schema(schema_path: str, schema: dict) -> None: """Write schema to YAML.""" schema_file = Path(schema_path) schema_file.parent.mkdir(parents=True, exist_ok=True) import yaml with open(schema_file, "w") as f: yaml.safe_dump(schema, f, sort_keys=False) def merge_schema(base: dict, incoming: dict) -> dict: """Merge incoming schema into base, appending lists and deep-merging dicts.""" for key, value in (incoming or {}).items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): base[key] = merge_schema(base[key], value) elif key in base and isinstance(base[key], list) and isinstance(value, list): base[key] = base[key] + [v for v in value if v not in base[key]] else: base[key] = value return base def append_schema(schema_path: str, incoming: dict) -> dict: """Append/merge schema fragment into existing schema.""" base = load_schema(schema_path) merged = merge_schema(base, incoming) write_schema(schema_path, merged) return merged def main(): parser = argparse.ArgumentParser(description="Ontology graph operations") subparsers = parser.add_subparsers(dest="command", required=True) # Create create_p = subparsers.add_parser("create", help="Create entity") create_p.add_argument("--type", "-t", required=True, help="Entity type") create_p.add_argument("--props", "-p", default="{}", help="Properties JSON") create_p.add_argument("--id", help="Entity ID (auto-generated if not provided)") create_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Get get_p = subparsers.add_parser("get", help="Get entity by ID") get_p.add_argument("--id", required=True, help="Entity ID") get_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Query query_p = subparsers.add_parser("query", help="Query entities") query_p.add_argument("--type", "-t", help="Entity type") query_p.add_argument("--where", "-w", default="{}", help="Filter JSON") query_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # List list_p = subparsers.add_parser("list", help="List entities") list_p.add_argument("--type", "-t", help="Entity type") list_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Update update_p = subparsers.add_parser("update", help="Update entity") update_p.add_argument("--id", required=True, help="Entity ID") update_p.add_argument("--props", "-p", required=True, help="Properties JSON") update_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Delete delete_p = subparsers.add_parser("delete", help="Delete entity") delete_p.add_argument("--id", required=True, help="Entity ID") delete_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Relate relate_p = subparsers.add_parser("relate", help="Create relation") relate_p.add_argument("--from", dest="from_id", required=True, help="From entity ID") relate_p.add_argument("--rel", "-r", required=True, help="Relation type") relate_p.add_argument("--to", dest="to_id", required=True, help="To entity ID") relate_p.add_argument("--props", "-p", default="{}", help="Relation properties JSON") relate_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Related related_p = subparsers.add_parser("related", help="Get related entities") related_p.add_argument("--id", required=True, help="Entity ID") related_p.add_argument("--rel", "-r", help="Relation type filter") related_p.add_argument("--dir", "-d", choices=["outgoing", "incoming", "both"], default="outgoing") related_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) # Validate validate_p = subparsers.add_parser("validate", help="Validate graph") validate_p.add_argument("--graph", "-g", default=DEFAULT_GRAPH_PATH) validate_p.add_argument("--schema", "-s", default=DEFAULT_SCHEMA_PATH) # Schema append schema_p = subparsers.add_parser("schema-append", help="Append/merge schema fragment") schema_p.add_argument("--schema", "-s", default=DEFAULT_SCHEMA_PATH) schema_p.add_argument("--data", "-d", help="Schema fragment as JSON") schema_p.add_argument("--file", "-f", help="Schema fragment file (YAML or JSON)") args = parser.parse_args() workspace_root = Path.cwd().resolve() if hasattr(args, "graph"): args.graph = str( resolve_safe_path(args.graph, root=workspace_root, label="graph path") ) if hasattr(args, "schema"): args.schema = str( resolve_safe_path(args.schema, root=workspace_root, label="schema path") ) if hasattr(args, "file") and args.file: args.file = str( resolve_safe_path( args.file, root=workspace_root, must_exist=True, label="schema file" ) ) if args.command == "create": props = json.loads(args.props) entity = create_entity(args.type, props, args.graph, args.id) print(json.dumps(entity, indent=2)) elif args.command == "get": entity = get_entity(args.id, args.graph) if entity: print(json.dumps(entity, indent=2)) else: print(f"Entity not found: {args.id}") elif args.command == "query": where = json.loads(args.where) results = query_entities(args.type, where, args.graph) print(json.dumps(results, indent=2)) elif args.command == "list": results = list_entities(args.type, args.graph) print(json.dumps(results, indent=2)) elif args.command == "update": props = json.loads(args.props) entity = update_entity(args.id, props, args.graph) if entity: print(json.dumps(entity, indent=2)) else: print(f"Entity not found: {args.id}") elif args.command == "delete": if delete_entity(args.id, args.graph): print(f"Deleted: {args.id}") else: print(f"Entity not found: {args.id}") elif args.command == "relate": props = json.loads(args.props) rel = create_relation(args.from_id, args.rel, args.to_id, props, args.graph) print(json.dumps(rel, indent=2)) elif args.command == "related": results = get_related(args.id, args.rel, args.graph, args.dir) print(json.dumps(results, indent=2)) elif args.command == "validate": errors = validate_graph(args.graph, args.schema) if errors: print("Validation errors:") for err in errors: print(f" - {err}") else: print("Graph is valid.") elif args.command == "schema-append": if not args.data and not args.file: raise SystemExit("schema-append requires --data or --file") incoming = {} if args.data: incoming = json.loads(args.data) else: path = Path(args.file) if path.suffix.lower() == ".json": with open(path) as f: incoming = json.load(f) else: import yaml with open(path) as f: incoming = yaml.safe_load(f) or {} merged = append_schema(args.schema, incoming) print(json.dumps(merged, indent=2)) if __name__ == "__main__": main()