minor updates, includes new skills for just-ship-it and push to proof
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
@@ -0,0 +1,529 @@
|
||||
#!/usr/bin/env python3
|
||||
# /// script
|
||||
# requires-python = ">=3.11"
|
||||
# dependencies = ["markdown", "requests", "truststore"]
|
||||
# ///
|
||||
"""Sync markdown docs to Confluence Cloud.
|
||||
|
||||
Reads a .confluence-mapping.json file, syncs local markdown files
|
||||
to Confluence pages via REST API v2, and updates the mapping file.
|
||||
|
||||
Run with: uv run scripts/sync_confluence.py [options]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import date, timezone, datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import quote
|
||||
|
||||
import truststore
|
||||
truststore.inject_into_ssl()
|
||||
|
||||
import markdown
|
||||
import requests
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def find_repo_root() -> Path | None:
|
||||
"""Walk up from CWD to find a git repo root."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--show-toplevel"],
|
||||
capture_output=True, text=True, check=True,
|
||||
)
|
||||
return Path(result.stdout.strip())
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def find_mapping_file(start: Path) -> Path | None:
|
||||
"""Search for .confluence-mapping.json walking up from *start*.
|
||||
|
||||
Checks <dir>/docs/.confluence-mapping.json and
|
||||
<dir>/.confluence-mapping.json at each level.
|
||||
"""
|
||||
current = start.resolve()
|
||||
while True:
|
||||
for candidate in (
|
||||
current / "docs" / ".confluence-mapping.json",
|
||||
current / ".confluence-mapping.json",
|
||||
):
|
||||
if candidate.is_file():
|
||||
return candidate
|
||||
parent = current.parent
|
||||
if parent == current:
|
||||
break
|
||||
current = parent
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mapping file helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_mapping(path: Path) -> dict:
|
||||
"""Load and lightly validate the mapping file."""
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
for key in ("confluence", "parentPage"):
|
||||
if key not in data:
|
||||
raise ValueError(f"Mapping file missing required key: '{key}'")
|
||||
data.setdefault("pages", {})
|
||||
data.setdefault("unmapped", [])
|
||||
return data
|
||||
|
||||
|
||||
def save_mapping(path: Path, data: dict) -> None:
|
||||
"""Write the mapping file with stable formatting."""
|
||||
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Markdown → Confluence storage format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
MD_EXTENSIONS = [
|
||||
"markdown.extensions.tables",
|
||||
"markdown.extensions.fenced_code",
|
||||
"markdown.extensions.toc",
|
||||
"markdown.extensions.md_in_html",
|
||||
"markdown.extensions.sane_lists",
|
||||
]
|
||||
|
||||
MD_EXTENSION_CONFIGS: dict = {
|
||||
"markdown.extensions.toc": {"permalink": False},
|
||||
}
|
||||
|
||||
|
||||
def md_to_storage(md_content: str) -> str:
|
||||
"""Convert markdown to Confluence storage-format XHTML."""
|
||||
return markdown.markdown(
|
||||
md_content,
|
||||
extensions=MD_EXTENSIONS,
|
||||
extension_configs=MD_EXTENSION_CONFIGS,
|
||||
output_format="xhtml",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Title helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def extract_h1(md_content: str) -> str | None:
|
||||
"""Return the first ``# Heading`` from *md_content*, or None."""
|
||||
for line in md_content.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("# ") and not stripped.startswith("## "):
|
||||
return stripped[2:].strip()
|
||||
return None
|
||||
|
||||
|
||||
def title_from_filename(filename: str) -> str:
|
||||
"""Derive a human-readable title from a kebab-case filename."""
|
||||
stem = filename.removesuffix(".md")
|
||||
words = stem.split("-")
|
||||
# Capitalise each word, then fix known acronyms/terms
|
||||
title = " ".join(w.capitalize() for w in words)
|
||||
acronyms = {
|
||||
"Ats": "ATS", "Api": "API", "Ms": "MS", "Unie": "UNIE",
|
||||
"Id": "ID", "Opa": "OPA", "Zi": "ZI", "Cql": "CQL",
|
||||
"Jql": "JQL", "Sdk": "SDK", "Oauth": "OAuth", "Cdn": "CDN",
|
||||
"Aws": "AWS", "Gcp": "GCP", "Grpc": "gRPC",
|
||||
}
|
||||
for wrong, right in acronyms.items():
|
||||
title = re.sub(rf"\b{wrong}\b", right, title)
|
||||
return title
|
||||
|
||||
|
||||
def resolve_title(filename: str, md_content: str, parent_title: str | None) -> str:
|
||||
"""Pick the best page title for a file.
|
||||
|
||||
Priority: H1 from markdown > filename-derived > raw filename.
|
||||
If *parent_title* is set, prefix with ``<parent>: <title>``.
|
||||
"""
|
||||
title = extract_h1(md_content) or title_from_filename(filename)
|
||||
if parent_title:
|
||||
# Avoid double-prefixing if the title already starts with parent
|
||||
if not title.startswith(parent_title):
|
||||
title = f"{parent_title}: {title}"
|
||||
return title
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sync timestamp injection (Confluence copy only — local files untouched)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_SYNC_RE = re.compile(r"> \*\*Last synced to Confluence\*\*:.*")
|
||||
|
||||
|
||||
def inject_sync_timestamp(md_content: str, sync_date: str) -> str:
|
||||
"""Add or update the sync-timestamp callout in *md_content*."""
|
||||
stamp = f"> **Last synced to Confluence**: {sync_date}"
|
||||
|
||||
if _SYNC_RE.search(md_content):
|
||||
return _SYNC_RE.sub(stamp, md_content)
|
||||
|
||||
lines = md_content.split("\n")
|
||||
insert_at = 0
|
||||
|
||||
# After YAML front-matter
|
||||
if lines and lines[0].strip() == "---":
|
||||
for i, line in enumerate(lines[1:], 1):
|
||||
if line.strip() == "---":
|
||||
insert_at = i + 1
|
||||
break
|
||||
# Or after first H1
|
||||
elif lines and lines[0].startswith("# "):
|
||||
insert_at = 1
|
||||
|
||||
lines.insert(insert_at, "")
|
||||
lines.insert(insert_at + 1, stamp)
|
||||
lines.insert(insert_at + 2, "")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Confluence REST API v1 client
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class ConfluenceClient:
|
||||
"""Thin wrapper around the Confluence Cloud REST API v1.
|
||||
|
||||
Uses Basic Auth (email + API token) with X-Atlassian-Token header,
|
||||
which is required by some Confluence Cloud instances that block v2
|
||||
or enforce XSRF protection.
|
||||
"""
|
||||
|
||||
def __init__(self, base_url: str, email: str, api_token: str):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.session = requests.Session()
|
||||
cred = base64.b64encode(f"{email}:{api_token}".encode()).decode()
|
||||
self.session.headers.update({
|
||||
"Authorization": f"Basic {cred}",
|
||||
"X-Atlassian-Token": "no-check",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
|
||||
# -- low-level helpers ---------------------------------------------------
|
||||
|
||||
def _request(self, method: str, path: str, **kwargs) -> requests.Response:
|
||||
"""Make a request with basic retry on 429 / 5xx."""
|
||||
url = f"{self.base_url}{path}"
|
||||
for attempt in range(4):
|
||||
resp = self.session.request(method, url, **kwargs)
|
||||
if resp.status_code == 429:
|
||||
wait = int(resp.headers.get("Retry-After", 5))
|
||||
print(f" Rate-limited, waiting {wait}s …")
|
||||
time.sleep(wait)
|
||||
continue
|
||||
if resp.status_code >= 500 and attempt < 3:
|
||||
time.sleep(2 ** attempt)
|
||||
continue
|
||||
resp.raise_for_status()
|
||||
return resp
|
||||
resp.raise_for_status() # final attempt — let it raise
|
||||
return resp # unreachable, keeps type-checkers happy
|
||||
|
||||
# -- page operations -----------------------------------------------------
|
||||
|
||||
def get_page(self, page_id: str) -> dict:
|
||||
"""Fetch page metadata including current version number."""
|
||||
return self._request(
|
||||
"GET", f"/rest/api/content/{page_id}",
|
||||
params={"expand": "version"},
|
||||
).json()
|
||||
|
||||
def create_page(
|
||||
self, *, space_key: str, parent_id: str, title: str, body: str,
|
||||
) -> dict:
|
||||
payload = {
|
||||
"type": "page",
|
||||
"title": title,
|
||||
"space": {"key": space_key},
|
||||
"ancestors": [{"id": parent_id}],
|
||||
"body": {
|
||||
"storage": {
|
||||
"value": body,
|
||||
"representation": "storage",
|
||||
},
|
||||
},
|
||||
}
|
||||
return self._request("POST", "/rest/api/content", json=payload).json()
|
||||
|
||||
def update_page(
|
||||
self, *, page_id: str, title: str, body: str, version_msg: str = "",
|
||||
) -> dict:
|
||||
current = self.get_page(page_id)
|
||||
next_ver = current["version"]["number"] + 1
|
||||
payload = {
|
||||
"type": "page",
|
||||
"title": title,
|
||||
"body": {
|
||||
"storage": {
|
||||
"value": body,
|
||||
"representation": "storage",
|
||||
},
|
||||
},
|
||||
"version": {"number": next_ver, "message": version_msg},
|
||||
}
|
||||
return self._request(
|
||||
"PUT", f"/rest/api/content/{page_id}", json=payload,
|
||||
).json()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# URL builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def page_url(base_url: str, space_key: str, page_id: str, title: str) -> str:
|
||||
"""Build a human-friendly Confluence page URL."""
|
||||
safe = quote(title.replace(" ", "+"), safe="+")
|
||||
return f"{base_url}/spaces/{space_key}/pages/{page_id}/{safe}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core sync logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def sync_file(
|
||||
client: ConfluenceClient,
|
||||
md_path: Path,
|
||||
mapping: dict,
|
||||
*,
|
||||
dry_run: bool = False,
|
||||
) -> dict | None:
|
||||
"""Sync one markdown file. Returns page-info dict or None on failure."""
|
||||
filename = md_path.name
|
||||
cfg = mapping["confluence"]
|
||||
parent = mapping["parentPage"]
|
||||
pages = mapping["pages"]
|
||||
existing = pages.get(filename)
|
||||
today = date.today().isoformat()
|
||||
|
||||
md_content = md_path.read_text(encoding="utf-8")
|
||||
md_for_confluence = inject_sync_timestamp(md_content, today)
|
||||
storage_body = md_to_storage(md_for_confluence)
|
||||
|
||||
# Resolve title — keep existing title for already-mapped pages
|
||||
if existing:
|
||||
title = existing["title"]
|
||||
else:
|
||||
title = resolve_title(filename, md_content, parent.get("title"))
|
||||
|
||||
base = cfg.get("baseUrl", "")
|
||||
space_key = cfg.get("spaceKey", "")
|
||||
|
||||
# -- update existing page ------------------------------------------------
|
||||
if existing:
|
||||
pid = existing["pageId"]
|
||||
if dry_run:
|
||||
print(f" [dry-run] update {filename} (page {pid})")
|
||||
return existing
|
||||
try:
|
||||
client.update_page(
|
||||
page_id=pid,
|
||||
title=title,
|
||||
body=storage_body,
|
||||
version_msg=f"Synced from local docs {today}",
|
||||
)
|
||||
url = page_url(base, space_key, pid, title)
|
||||
print(f" updated {filename}")
|
||||
return {"pageId": pid, "title": title, "url": url}
|
||||
except requests.HTTPError as exc:
|
||||
_report_error("update", filename, exc)
|
||||
return None
|
||||
|
||||
# -- create new page -----------------------------------------------------
|
||||
if dry_run:
|
||||
print(f" [dry-run] create {filename} → {title}")
|
||||
return {"pageId": "DRY_RUN", "title": title, "url": ""}
|
||||
try:
|
||||
result = client.create_page(
|
||||
space_key=cfg["spaceKey"],
|
||||
parent_id=parent["id"],
|
||||
title=title,
|
||||
body=storage_body,
|
||||
)
|
||||
pid = result["id"]
|
||||
url = page_url(base, space_key, pid, title)
|
||||
print(f" created {filename} (page {pid})")
|
||||
return {"pageId": pid, "title": title, "url": url}
|
||||
except requests.HTTPError as exc:
|
||||
_report_error("create", filename, exc)
|
||||
return None
|
||||
|
||||
|
||||
def _report_error(verb: str, filename: str, exc: requests.HTTPError) -> None:
|
||||
print(f" FAILED {verb} {filename}: {exc}")
|
||||
if exc.response is not None:
|
||||
body = exc.response.text[:500]
|
||||
print(f" {body}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Sync markdown docs to Confluence Cloud.",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
environment variables
|
||||
CONFLUENCE_EMAIL Atlassian account email
|
||||
CONFLUENCE_API_TOKEN_WRITE Atlassian API token (write-scoped)
|
||||
CONFLUENCE_API_TOKEN Fallback if _WRITE is not set
|
||||
CONFLUENCE_BASE_URL Wiki base URL (overrides mapping file)
|
||||
|
||||
examples
|
||||
%(prog)s # sync all docs
|
||||
%(prog)s --dry-run # preview without changes
|
||||
%(prog)s --file docs/my-doc.md # sync one file
|
||||
%(prog)s --update-only # only update existing pages
|
||||
""",
|
||||
)
|
||||
p.add_argument("--docs-dir", type=Path,
|
||||
help="Docs directory (default: inferred from mapping file location)")
|
||||
p.add_argument("--mapping-file", type=Path,
|
||||
help="Path to .confluence-mapping.json (default: auto-detect)")
|
||||
p.add_argument("--file", type=Path, dest="single_file",
|
||||
help="Sync a single file instead of all docs")
|
||||
p.add_argument("--dry-run", action="store_true",
|
||||
help="Show what would happen without making API calls")
|
||||
p.add_argument("--create-only", action="store_true",
|
||||
help="Only create new pages (skip existing)")
|
||||
p.add_argument("--update-only", action="store_true",
|
||||
help="Only update existing pages (skip new)")
|
||||
return p
|
||||
|
||||
|
||||
def resolve_base_url(cfg: dict) -> str | None:
|
||||
"""Derive the Confluence base URL from env or mapping config."""
|
||||
from_env = os.environ.get("CONFLUENCE_BASE_URL")
|
||||
if from_env:
|
||||
return from_env.rstrip("/")
|
||||
from_cfg = cfg.get("baseUrl")
|
||||
if from_cfg:
|
||||
return from_cfg.rstrip("/")
|
||||
# cloudId might be a domain like "discoverorg.atlassian.net"
|
||||
cloud_id = cfg.get("cloudId", "")
|
||||
if "." in cloud_id:
|
||||
return f"https://{cloud_id}/wiki"
|
||||
return None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# -- discover paths ------------------------------------------------------
|
||||
repo_root = find_repo_root() or Path.cwd()
|
||||
|
||||
if args.mapping_file:
|
||||
mapping_path = args.mapping_file.resolve()
|
||||
else:
|
||||
mapping_path = find_mapping_file(repo_root)
|
||||
if not mapping_path or not mapping_path.is_file():
|
||||
print("ERROR: cannot find .confluence-mapping.json")
|
||||
print(" Pass --mapping-file or run from within the project.")
|
||||
sys.exit(1)
|
||||
|
||||
docs_dir = args.docs_dir.resolve() if args.docs_dir else mapping_path.parent
|
||||
print(f"mapping: {mapping_path}")
|
||||
print(f"docs dir: {docs_dir}")
|
||||
|
||||
# -- load config ---------------------------------------------------------
|
||||
mapping = load_mapping(mapping_path)
|
||||
cfg = mapping["confluence"]
|
||||
|
||||
email = os.environ.get("CONFLUENCE_EMAIL", "")
|
||||
# Prefer write-scoped token, fall back to general token
|
||||
token = (os.environ.get("CONFLUENCE_API_TOKEN_WRITE")
|
||||
or os.environ.get("CONFLUENCE_API_TOKEN", ""))
|
||||
base_url = resolve_base_url(cfg)
|
||||
|
||||
if not email or not token:
|
||||
print("ERROR: CONFLUENCE_EMAIL and CONFLUENCE_API_TOKEN_WRITE must be set.")
|
||||
print(" https://id.atlassian.com/manage-profile/security/api-tokens")
|
||||
sys.exit(1)
|
||||
if not base_url:
|
||||
print("ERROR: cannot determine Confluence base URL.")
|
||||
print(" Set CONFLUENCE_BASE_URL or add baseUrl to the mapping file.")
|
||||
sys.exit(1)
|
||||
|
||||
# Ensure baseUrl is persisted so page_url() works
|
||||
cfg.setdefault("baseUrl", base_url)
|
||||
|
||||
client = ConfluenceClient(base_url, email, token)
|
||||
|
||||
# -- collect files -------------------------------------------------------
|
||||
if args.single_file:
|
||||
target = args.single_file.resolve()
|
||||
if not target.is_file():
|
||||
print(f"ERROR: file not found: {target}")
|
||||
sys.exit(1)
|
||||
md_files = [target]
|
||||
else:
|
||||
md_files = sorted(
|
||||
p for p in docs_dir.glob("*.md")
|
||||
if not p.name.startswith(".")
|
||||
)
|
||||
if not md_files:
|
||||
print("No markdown files found.")
|
||||
sys.exit(0)
|
||||
|
||||
pages = mapping["pages"]
|
||||
if args.create_only:
|
||||
md_files = [f for f in md_files if f.name not in pages]
|
||||
elif args.update_only:
|
||||
md_files = [f for f in md_files if f.name in pages]
|
||||
|
||||
total = len(md_files)
|
||||
mode = "dry-run" if args.dry_run else "live"
|
||||
print(f"\n{total} file(s) to sync ({mode})\n")
|
||||
|
||||
# -- sync ----------------------------------------------------------------
|
||||
created = updated = failed = 0
|
||||
for i, md_path in enumerate(md_files, 1):
|
||||
filename = md_path.name
|
||||
is_new = filename not in pages
|
||||
prefix = f"[{i}/{total}]"
|
||||
|
||||
result = sync_file(client, md_path, mapping, dry_run=args.dry_run)
|
||||
if result:
|
||||
if not args.dry_run:
|
||||
pages[filename] = result
|
||||
if is_new:
|
||||
created += 1
|
||||
else:
|
||||
updated += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
# -- persist mapping -----------------------------------------------------
|
||||
if not args.dry_run and (created or updated):
|
||||
mapping["lastSynced"] = date.today().isoformat()
|
||||
# Clean synced files out of the unmapped list
|
||||
synced = {f.name for f in md_files}
|
||||
mapping["unmapped"] = [u for u in mapping.get("unmapped", []) if u not in synced]
|
||||
save_mapping(mapping_path, mapping)
|
||||
print(f"\nmapping file updated")
|
||||
|
||||
# -- summary -------------------------------------------------------------
|
||||
print(f"\ndone: {created} created · {updated} updated · {failed} failed")
|
||||
if failed:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user