Files
claude-engineering-plugin/plugins/compound-engineering/skills/sync-confluence/scripts/sync_confluence.py
John Lamb 24d77808c0
Some checks failed
CI / test (push) Has been cancelled
minor updates, includes new skills for just-ship-it and push to proof
2026-03-13 18:20:27 -05:00

530 lines
18 KiB
Python

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = ["markdown", "requests", "truststore"]
# ///
"""Sync markdown docs to Confluence Cloud.
Reads a .confluence-mapping.json file, syncs local markdown files
to Confluence pages via REST API v2, and updates the mapping file.
Run with: uv run scripts/sync_confluence.py [options]
"""
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import time
from datetime import date, timezone, datetime
from pathlib import Path
from urllib.parse import quote
import truststore
truststore.inject_into_ssl()
import markdown
import requests
# ---------------------------------------------------------------------------
# Path discovery
# ---------------------------------------------------------------------------
def find_repo_root() -> Path | None:
"""Walk up from CWD to find a git repo root."""
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True, text=True, check=True,
)
return Path(result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError):
return None
def find_mapping_file(start: Path) -> Path | None:
"""Search for .confluence-mapping.json walking up from *start*.
Checks <dir>/docs/.confluence-mapping.json and
<dir>/.confluence-mapping.json at each level.
"""
current = start.resolve()
while True:
for candidate in (
current / "docs" / ".confluence-mapping.json",
current / ".confluence-mapping.json",
):
if candidate.is_file():
return candidate
parent = current.parent
if parent == current:
break
current = parent
return None
# ---------------------------------------------------------------------------
# Mapping file helpers
# ---------------------------------------------------------------------------
def load_mapping(path: Path) -> dict:
"""Load and lightly validate the mapping file."""
data = json.loads(path.read_text(encoding="utf-8"))
for key in ("confluence", "parentPage"):
if key not in data:
raise ValueError(f"Mapping file missing required key: '{key}'")
data.setdefault("pages", {})
data.setdefault("unmapped", [])
return data
def save_mapping(path: Path, data: dict) -> None:
"""Write the mapping file with stable formatting."""
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
# ---------------------------------------------------------------------------
# Markdown → Confluence storage format
# ---------------------------------------------------------------------------
MD_EXTENSIONS = [
"markdown.extensions.tables",
"markdown.extensions.fenced_code",
"markdown.extensions.toc",
"markdown.extensions.md_in_html",
"markdown.extensions.sane_lists",
]
MD_EXTENSION_CONFIGS: dict = {
"markdown.extensions.toc": {"permalink": False},
}
def md_to_storage(md_content: str) -> str:
"""Convert markdown to Confluence storage-format XHTML."""
return markdown.markdown(
md_content,
extensions=MD_EXTENSIONS,
extension_configs=MD_EXTENSION_CONFIGS,
output_format="xhtml",
)
# ---------------------------------------------------------------------------
# Title helpers
# ---------------------------------------------------------------------------
def extract_h1(md_content: str) -> str | None:
"""Return the first ``# Heading`` from *md_content*, or None."""
for line in md_content.splitlines():
stripped = line.strip()
if stripped.startswith("# ") and not stripped.startswith("## "):
return stripped[2:].strip()
return None
def title_from_filename(filename: str) -> str:
"""Derive a human-readable title from a kebab-case filename."""
stem = filename.removesuffix(".md")
words = stem.split("-")
# Capitalise each word, then fix known acronyms/terms
title = " ".join(w.capitalize() for w in words)
acronyms = {
"Ats": "ATS", "Api": "API", "Ms": "MS", "Unie": "UNIE",
"Id": "ID", "Opa": "OPA", "Zi": "ZI", "Cql": "CQL",
"Jql": "JQL", "Sdk": "SDK", "Oauth": "OAuth", "Cdn": "CDN",
"Aws": "AWS", "Gcp": "GCP", "Grpc": "gRPC",
}
for wrong, right in acronyms.items():
title = re.sub(rf"\b{wrong}\b", right, title)
return title
def resolve_title(filename: str, md_content: str, parent_title: str | None) -> str:
"""Pick the best page title for a file.
Priority: H1 from markdown > filename-derived > raw filename.
If *parent_title* is set, prefix with ``<parent>: <title>``.
"""
title = extract_h1(md_content) or title_from_filename(filename)
if parent_title:
# Avoid double-prefixing if the title already starts with parent
if not title.startswith(parent_title):
title = f"{parent_title}: {title}"
return title
# ---------------------------------------------------------------------------
# Sync timestamp injection (Confluence copy only — local files untouched)
# ---------------------------------------------------------------------------
_SYNC_RE = re.compile(r"> \*\*Last synced to Confluence\*\*:.*")
def inject_sync_timestamp(md_content: str, sync_date: str) -> str:
"""Add or update the sync-timestamp callout in *md_content*."""
stamp = f"> **Last synced to Confluence**: {sync_date}"
if _SYNC_RE.search(md_content):
return _SYNC_RE.sub(stamp, md_content)
lines = md_content.split("\n")
insert_at = 0
# After YAML front-matter
if lines and lines[0].strip() == "---":
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
insert_at = i + 1
break
# Or after first H1
elif lines and lines[0].startswith("# "):
insert_at = 1
lines.insert(insert_at, "")
lines.insert(insert_at + 1, stamp)
lines.insert(insert_at + 2, "")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Confluence REST API v1 client
# ---------------------------------------------------------------------------
class ConfluenceClient:
"""Thin wrapper around the Confluence Cloud REST API v1.
Uses Basic Auth (email + API token) with X-Atlassian-Token header,
which is required by some Confluence Cloud instances that block v2
or enforce XSRF protection.
"""
def __init__(self, base_url: str, email: str, api_token: str):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
cred = base64.b64encode(f"{email}:{api_token}".encode()).decode()
self.session.headers.update({
"Authorization": f"Basic {cred}",
"X-Atlassian-Token": "no-check",
"Content-Type": "application/json",
"Accept": "application/json",
})
# -- low-level helpers ---------------------------------------------------
def _request(self, method: str, path: str, **kwargs) -> requests.Response:
"""Make a request with basic retry on 429 / 5xx."""
url = f"{self.base_url}{path}"
for attempt in range(4):
resp = self.session.request(method, url, **kwargs)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 5))
print(f" Rate-limited, waiting {wait}s …")
time.sleep(wait)
continue
if resp.status_code >= 500 and attempt < 3:
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
return resp
resp.raise_for_status() # final attempt — let it raise
return resp # unreachable, keeps type-checkers happy
# -- page operations -----------------------------------------------------
def get_page(self, page_id: str) -> dict:
"""Fetch page metadata including current version number."""
return self._request(
"GET", f"/rest/api/content/{page_id}",
params={"expand": "version"},
).json()
def create_page(
self, *, space_key: str, parent_id: str, title: str, body: str,
) -> dict:
payload = {
"type": "page",
"title": title,
"space": {"key": space_key},
"ancestors": [{"id": parent_id}],
"body": {
"storage": {
"value": body,
"representation": "storage",
},
},
}
return self._request("POST", "/rest/api/content", json=payload).json()
def update_page(
self, *, page_id: str, title: str, body: str, version_msg: str = "",
) -> dict:
current = self.get_page(page_id)
next_ver = current["version"]["number"] + 1
payload = {
"type": "page",
"title": title,
"body": {
"storage": {
"value": body,
"representation": "storage",
},
},
"version": {"number": next_ver, "message": version_msg},
}
return self._request(
"PUT", f"/rest/api/content/{page_id}", json=payload,
).json()
# ---------------------------------------------------------------------------
# URL builder
# ---------------------------------------------------------------------------
def page_url(base_url: str, space_key: str, page_id: str, title: str) -> str:
"""Build a human-friendly Confluence page URL."""
safe = quote(title.replace(" ", "+"), safe="+")
return f"{base_url}/spaces/{space_key}/pages/{page_id}/{safe}"
# ---------------------------------------------------------------------------
# Core sync logic
# ---------------------------------------------------------------------------
def sync_file(
client: ConfluenceClient,
md_path: Path,
mapping: dict,
*,
dry_run: bool = False,
) -> dict | None:
"""Sync one markdown file. Returns page-info dict or None on failure."""
filename = md_path.name
cfg = mapping["confluence"]
parent = mapping["parentPage"]
pages = mapping["pages"]
existing = pages.get(filename)
today = date.today().isoformat()
md_content = md_path.read_text(encoding="utf-8")
md_for_confluence = inject_sync_timestamp(md_content, today)
storage_body = md_to_storage(md_for_confluence)
# Resolve title — keep existing title for already-mapped pages
if existing:
title = existing["title"]
else:
title = resolve_title(filename, md_content, parent.get("title"))
base = cfg.get("baseUrl", "")
space_key = cfg.get("spaceKey", "")
# -- update existing page ------------------------------------------------
if existing:
pid = existing["pageId"]
if dry_run:
print(f" [dry-run] update {filename} (page {pid})")
return existing
try:
client.update_page(
page_id=pid,
title=title,
body=storage_body,
version_msg=f"Synced from local docs {today}",
)
url = page_url(base, space_key, pid, title)
print(f" updated {filename}")
return {"pageId": pid, "title": title, "url": url}
except requests.HTTPError as exc:
_report_error("update", filename, exc)
return None
# -- create new page -----------------------------------------------------
if dry_run:
print(f" [dry-run] create {filename}{title}")
return {"pageId": "DRY_RUN", "title": title, "url": ""}
try:
result = client.create_page(
space_key=cfg["spaceKey"],
parent_id=parent["id"],
title=title,
body=storage_body,
)
pid = result["id"]
url = page_url(base, space_key, pid, title)
print(f" created {filename} (page {pid})")
return {"pageId": pid, "title": title, "url": url}
except requests.HTTPError as exc:
_report_error("create", filename, exc)
return None
def _report_error(verb: str, filename: str, exc: requests.HTTPError) -> None:
print(f" FAILED {verb} {filename}: {exc}")
if exc.response is not None:
body = exc.response.text[:500]
print(f" {body}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Sync markdown docs to Confluence Cloud.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
environment variables
CONFLUENCE_EMAIL Atlassian account email
CONFLUENCE_API_TOKEN_WRITE Atlassian API token (write-scoped)
CONFLUENCE_API_TOKEN Fallback if _WRITE is not set
CONFLUENCE_BASE_URL Wiki base URL (overrides mapping file)
examples
%(prog)s # sync all docs
%(prog)s --dry-run # preview without changes
%(prog)s --file docs/my-doc.md # sync one file
%(prog)s --update-only # only update existing pages
""",
)
p.add_argument("--docs-dir", type=Path,
help="Docs directory (default: inferred from mapping file location)")
p.add_argument("--mapping-file", type=Path,
help="Path to .confluence-mapping.json (default: auto-detect)")
p.add_argument("--file", type=Path, dest="single_file",
help="Sync a single file instead of all docs")
p.add_argument("--dry-run", action="store_true",
help="Show what would happen without making API calls")
p.add_argument("--create-only", action="store_true",
help="Only create new pages (skip existing)")
p.add_argument("--update-only", action="store_true",
help="Only update existing pages (skip new)")
return p
def resolve_base_url(cfg: dict) -> str | None:
"""Derive the Confluence base URL from env or mapping config."""
from_env = os.environ.get("CONFLUENCE_BASE_URL")
if from_env:
return from_env.rstrip("/")
from_cfg = cfg.get("baseUrl")
if from_cfg:
return from_cfg.rstrip("/")
# cloudId might be a domain like "discoverorg.atlassian.net"
cloud_id = cfg.get("cloudId", "")
if "." in cloud_id:
return f"https://{cloud_id}/wiki"
return None
def main() -> None:
parser = build_parser()
args = parser.parse_args()
# -- discover paths ------------------------------------------------------
repo_root = find_repo_root() or Path.cwd()
if args.mapping_file:
mapping_path = args.mapping_file.resolve()
else:
mapping_path = find_mapping_file(repo_root)
if not mapping_path or not mapping_path.is_file():
print("ERROR: cannot find .confluence-mapping.json")
print(" Pass --mapping-file or run from within the project.")
sys.exit(1)
docs_dir = args.docs_dir.resolve() if args.docs_dir else mapping_path.parent
print(f"mapping: {mapping_path}")
print(f"docs dir: {docs_dir}")
# -- load config ---------------------------------------------------------
mapping = load_mapping(mapping_path)
cfg = mapping["confluence"]
email = os.environ.get("CONFLUENCE_EMAIL", "")
# Prefer write-scoped token, fall back to general token
token = (os.environ.get("CONFLUENCE_API_TOKEN_WRITE")
or os.environ.get("CONFLUENCE_API_TOKEN", ""))
base_url = resolve_base_url(cfg)
if not email or not token:
print("ERROR: CONFLUENCE_EMAIL and CONFLUENCE_API_TOKEN_WRITE must be set.")
print(" https://id.atlassian.com/manage-profile/security/api-tokens")
sys.exit(1)
if not base_url:
print("ERROR: cannot determine Confluence base URL.")
print(" Set CONFLUENCE_BASE_URL or add baseUrl to the mapping file.")
sys.exit(1)
# Ensure baseUrl is persisted so page_url() works
cfg.setdefault("baseUrl", base_url)
client = ConfluenceClient(base_url, email, token)
# -- collect files -------------------------------------------------------
if args.single_file:
target = args.single_file.resolve()
if not target.is_file():
print(f"ERROR: file not found: {target}")
sys.exit(1)
md_files = [target]
else:
md_files = sorted(
p for p in docs_dir.glob("*.md")
if not p.name.startswith(".")
)
if not md_files:
print("No markdown files found.")
sys.exit(0)
pages = mapping["pages"]
if args.create_only:
md_files = [f for f in md_files if f.name not in pages]
elif args.update_only:
md_files = [f for f in md_files if f.name in pages]
total = len(md_files)
mode = "dry-run" if args.dry_run else "live"
print(f"\n{total} file(s) to sync ({mode})\n")
# -- sync ----------------------------------------------------------------
created = updated = failed = 0
for i, md_path in enumerate(md_files, 1):
filename = md_path.name
is_new = filename not in pages
prefix = f"[{i}/{total}]"
result = sync_file(client, md_path, mapping, dry_run=args.dry_run)
if result:
if not args.dry_run:
pages[filename] = result
if is_new:
created += 1
else:
updated += 1
else:
failed += 1
# -- persist mapping -----------------------------------------------------
if not args.dry_run and (created or updated):
mapping["lastSynced"] = date.today().isoformat()
# Clean synced files out of the unmapped list
synced = {f.name for f in md_files}
mapping["unmapped"] = [u for u in mapping.get("unmapped", []) if u not in synced]
save_mapping(mapping_path, mapping)
print(f"\nmapping file updated")
# -- summary -------------------------------------------------------------
print(f"\ndone: {created} created · {updated} updated · {failed} failed")
if failed:
sys.exit(1)
if __name__ == "__main__":
main()