knowledge-genome-orchestrator/skills/ingest/scripts/ingest-semantic.py

277 lines
12 KiB
Python

#!/usr/bin/env python3
# =============================================================================
# skills/ingest/scripts/ingest-semantic.py
# Phase 1 (semantic) of the Knowledge Genome ingest — the LIGHT version.
#
# The model does ONLY semantic extraction and returns ONE schema-constrained JSON
# object (no tools, no file writing, no git, no frontmatter, no slugs). This script
# then CONFORMS that output deterministically into wiki pages with enforced
# frontmatter + kebab-case paths, and writes a .ingest-manifest.json in EXACTLY the
# schema run-ingest.sh expects. run-ingest.sh (phase 2) then does index / log /
# scoped-lint / PR, unchanged.
#
# cd <genome checkout>
# ingest-semantic.py <genome> raw/articles/<file>.md # phase 1 (this)
# run-ingest.sh <genome> # phase 2 (deterministic)
#
# Why this shape: local tool-calling via pi/ollama proved fragile, and a small
# model does not reliably honour folders / naming / frontmatter / manifest schema
# when it writes files itself. Here the model cannot break the contract because it
# never touches the filesystem — the script owns all structure. Stdlib only.
#
# Emits a single JSON status line on stdout (for n8n / logs).
# =============================================================================
import json, os, re, sys, datetime, urllib.request, urllib.error
# --- config (override via env; these live in ~/.config/knowledge-genome.env) ---
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434/api/chat")
MODEL = os.environ.get("INGEST_MODEL", "qwen2.5:14b")
NUM_CTX = int(os.environ.get("INGEST_NUM_CTX", "16384"))
TIMEOUT = int(os.environ.get("INGEST_TIMEOUT", "600"))
TODAY = datetime.date.today().isoformat()
def die(stage, reason):
print(json.dumps({"status": "error", "stage": stage, "reason": reason}))
sys.exit(1)
# --- args + pre-flight (mirror the old skill's guards, enforced in code) ---
if len(sys.argv) < 3:
die("args", "usage: ingest-semantic.py <genome> <raw/rel/path.md>")
genome = sys.argv[1]
raw_rel = sys.argv[2].lstrip("./")
if "private/" in raw_rel or raw_rel.startswith("private"):
die("preflight", "refusing private source: " + raw_rel)
if os.environ.get("PRIVATE_CONTEXT", "disabled") != "disabled":
die("preflight", "PRIVATE_CONTEXT must be disabled")
if not raw_rel.startswith("raw/"):
die("preflight", "source must live under raw/: " + raw_rel)
if not os.path.isfile(raw_rel):
die("preflight", "source not found in cwd: " + raw_rel)
with open(raw_rel, "r", encoding="utf-8") as fh:
source_text = fh.read()
if not source_text.strip():
die("preflight", "source is empty: " + raw_rel)
# --- the semantic contract (authoritative copy; SKILL.md documents it) ---
SYSTEM_PROMPT = """You perform the SEMANTIC PASS of a single source into a knowledge wiki.
Read the source and return ONLY structured data describing what it contains.
You do not write files, you do not produce frontmatter, and you do not invent
paths, slugs, branches, commits or PRs — a deterministic script does all of that.
Rules:
- source_summary: a faithful, self-contained summary of the source, in the
source's own language. Plain prose, no markdown headings.
- key_points: the handful of concrete facts/claims worth indexing.
- entities: every person, tool, organisation or product the source names.
kind is one of person|tool|org|product. description is one or two factual
sentences. No markdown headings inside the description.
- concepts: every pattern, theory, decision or named idea the source explains.
description is one or two factual sentences.
- contradictions: ONLY when the source makes a claim that directly contradicts a
widely-known fact or contradicts itself. Otherwise return an empty list.
- Names must be the natural name of the thing; the script will normalise them.
Do not pad. Be faithful to the source."""
# --- JSON schema -> constrained decoding (Ollama structured outputs) ---
SCHEMA = {
"type": "object",
"properties": {
"source_title": {"type": "string"},
"source_summary": {"type": "string"},
"key_points": {"type": "array", "items": {"type": "string"}},
"entities": {"type": "array", "items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"kind": {"type": "string",
"enum": ["person", "tool", "org", "product"]},
"description": {"type": "string"},
},
"required": ["name", "description"],
}},
"concepts": {"type": "array", "items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
},
"required": ["name", "description"],
}},
"contradictions": {"type": "array", "items": {
"type": "object",
"properties": {
"concept": {"type": "string"},
"description": {"type": "string"},
},
"required": ["concept", "description"],
}},
"reasoning": {"type": "string"},
"pr_summary": {"type": "string"},
},
"required": ["source_title", "source_summary", "entities", "concepts"],
}
def call_model():
payload = {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content":
"Source path: " + raw_rel + "\n\n--- SOURCE START ---\n"
+ source_text + "\n--- SOURCE END ---\n\nReturn the JSON now."},
],
"format": SCHEMA, # schema-constrained generation
"stream": False,
# deterministic extraction; repetition penalties OFF for structured output
"options": {"temperature": 0.2, "repeat_penalty": 1.0, "num_ctx": NUM_CTX},
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OLLAMA_URL, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
resp = json.loads(r.read().decode("utf-8"))
except urllib.error.URLError as e:
die("model", "ollama request failed: " + str(e))
content = ((resp.get("message") or {}).get("content") or "").strip()
# schema-constrained, but stay defensive if a model wraps it in a fence
if content.startswith("```"):
content = content.strip("`")
brace = content.find("{")
if brace >= 0:
content = content[brace:]
try:
return json.loads(content)
except json.JSONDecodeError as e:
die("model", "model did not return valid JSON: " + str(e))
# --- conform helpers (the script OWNS all structure) ---
def slugify(s):
s = re.sub(r"[^a-z0-9]+", "-", (s or "").strip().lower())
return re.sub(r"-+", "-", s).strip("-") or "untitled"
def twords(s, n=12):
s = " ".join((s or "").split())
w = s.split(" ")
return s if len(w) <= n else " ".join(w[:n]) + ""
def frontmatter(ptype, tags):
taglist = "[" + ", ".join(sorted(set(t for t in tags if t))) + "]"
return ("---\n"
f"type: {ptype}\n"
f"domain: {genome}\n"
"maturity: draft\n"
f"last_updated: {TODAY}\n"
"private: false\n"
f"tags: {taglist}\n"
"---\n")
def write_new(path, ptype, title, body, tags):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(frontmatter(ptype, tags))
f.write(f"\n# {title}\n\n{body}\n")
def append_section(path, source_slug, body):
# never overwrite an existing page: accumulate, attributed to the new source
with open(path, "a", encoding="utf-8") as f:
f.write(f"\n\n## From [[sources/{source_slug}]]\n\n{body}\n")
try: # best-effort bump of last_updated in the existing frontmatter
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
txt = re.sub(r"(?m)^last_updated:.*$", "last_updated: " + TODAY, txt, count=1)
with open(path, "w", encoding="utf-8") as f:
f.write(txt)
except Exception:
pass
# --- run the semantic pass ---
sem = call_model()
source_slug = slugify(os.path.splitext(os.path.basename(raw_rel))[0])
pages = []
# 1. source page — canonical summary of THIS source (re)written
src_path = f"wiki/sources/{source_slug}.md"
src_status = "modified" if os.path.exists(src_path) else "created"
kp_lines = "\n".join("- " + p for p in (sem.get("key_points") or []) if p.strip())
src_body = (sem.get("source_summary") or "").strip()
if kp_lines:
src_body += "\n\n## Key points\n\n" + kp_lines
src_body += f"\n\n## Source\n\n- [[{raw_rel}]]\n"
src_tags = ([slugify(e.get("name", "")) for e in sem.get("entities", [])]
+ [slugify(c.get("name", "")) for c in sem.get("concepts", [])])[:8]
os.makedirs("wiki/sources", exist_ok=True)
with open(src_path, "w", encoding="utf-8") as f:
f.write(frontmatter("source", src_tags))
f.write(f"\n# {sem.get('source_title') or source_slug}\n\n{src_body}\n")
pages.append({"path": src_path,
"summary": twords(sem.get("source_title") or source_slug),
"maturity": "draft", "status": src_status})
def handle(kind_dir, ptype, items):
for it in items or []:
name = (it.get("name") or "").strip()
if not name:
continue
slug = slugify(name)
path = f"wiki/{kind_dir}/{slug}.md"
desc = (it.get("description") or "").strip()
if os.path.exists(path):
append_section(path, source_slug, desc)
pages.append({"path": path, "summary": twords(desc), "status": "modified"})
else:
body = desc + f"\n\n## Sources\n\n- [[sources/{source_slug}]]\n"
write_new(path, ptype, name, body, [genome, ptype])
pages.append({"path": path, "summary": twords(desc),
"maturity": "draft", "status": "created"})
# 2. entities, 3. concepts
handle("entities", "entity", sem.get("entities", []))
handle("concepts", "concept", sem.get("concepts", []))
# 4. contradictions -> conflict pages (run-ingest routes wiki/queries/conflict-*)
conflicts = sem.get("contradictions") or []
conf_slugs = []
for c in conflicts:
cslug = slugify(c.get("concept", "unknown"))
conf_slugs.append(cslug)
path = f"wiki/queries/conflict-{cslug}-{TODAY}.md"
write_new(path, "query", f"Conflict: {c.get('concept', '')}",
(c.get("description") or "").strip()
+ f"\n\n## Source\n\n- [[sources/{source_slug}]]\n",
[genome, "conflict"])
pages.append({"path": path, "summary": "", "maturity": "draft",
"status": "created"})
contradictions_str = ("None" if not conflicts
else f"{len(conflicts)} conflict file(s) created — "
+ ", ".join(conf_slugs))
# --- write the manifest in EXACTLY run-ingest.sh's schema ---
manifest = {
"raw_source": raw_rel,
"reasoning": sem.get("reasoning") or ("Ingest of " + raw_rel),
"pr_summary": sem.get("pr_summary") or ("Semantic ingest of " + raw_rel),
"contradictions": contradictions_str,
"pages": pages,
}
with open(".ingest-manifest.json", "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
print(json.dumps({"status": "ok", "stage": "semantic",
"pages": len(pages), "model": MODEL,
"manifest": ".ingest-manifest.json"}))