Files
claude-engineering-plugin/plugins/compound-engineering/skills/ce-demo-reel/scripts/capture-demo.py
2026-04-09 21:29:51 -07:00

654 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Evidence capture pipeline — deterministic helpers for the demo-reel skill.
Subcommands:
preflight Check tool availability (JSON output)
detect [--repo-root PATH] Detect project type from manifests (JSON output)
recommend --project-type T --change-type T --tools JSON Recommend capture tier (JSON output)
stitch [--duration N] OUTPUT FRAME [FRAME ...] Stitch frames into animated GIF
screenshot-reel --output OUT [--duration N] [--lang L] [--theme T] --text F [F ...] Render text frames via silicon + stitch
terminal-recording --output OUT --tape TAPE Run VHS tape file
upload FILE Upload to catbox.moe (retries once)
"""
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
# --- Config ---
MAX_GIF_SIZE = 10 * 1024 * 1024 # 10 MB — GitHub inline render limit
TARGET_GIF_SIZE = 5 * 1024 * 1024 # 5 MB — preferred target
CATBOX_API = "https://catbox.moe/user/api.php"
# --- Helpers ---
def die(msg):
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
def check_tool(name):
return shutil.which(name) is not None
def run_cmd(cmd, timeout=120):
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
except subprocess.TimeoutExpired:
print(f"ERROR: Command timed out after {timeout}s: {' '.join(cmd)}", file=sys.stderr)
return subprocess.CompletedProcess(cmd, returncode=1, stdout="", stderr=f"Timed out after {timeout}s")
if result.returncode != 0:
print(f"ERROR: Command failed (exit {result.returncode}): {' '.join(cmd)}", file=sys.stderr)
if result.stderr:
print(result.stderr.strip(), file=sys.stderr)
return result
def file_size_mb(path):
return Path(path).stat().st_size / (1024 * 1024)
# --- Preflight ---
def cmd_preflight(_args):
tools = {
"agent_browser": check_tool("agent-browser"),
"vhs": check_tool("vhs"),
"silicon": check_tool("silicon"),
"ffmpeg": check_tool("ffmpeg"),
"ffprobe": check_tool("ffprobe"),
}
print(json.dumps(tools))
# --- Detect ---
ELECTRON_DEPS = {"electron", "electron-builder", "electron-forge", "electron-vite", "electron-packager"}
WEB_NODE_DEPS = {
"react", "vue", "svelte", "astro", "next", "nuxt", "@angular/core", "solid-js",
"@remix-run/react", "gatsby", "express", "fastify", "koa", "hono", "@hono/node-server",
}
WEB_RUBY_DEPS = {"rails", "sinatra", "hanami", "roda"}
WEB_GO_DEPS = {
"github.com/gin-gonic/gin", "github.com/labstack/echo", "github.com/gofiber/fiber",
"github.com/go-chi/chi", "github.com/gorilla/mux",
}
# Note: net/http is stdlib and won't appear in go.mod. The agent detects stdlib web
# servers from source imports in the diff and overrides the classification (Step 2).
WEB_PYTHON_DEPS = {"flask", "django", "fastapi", "starlette", "tornado", "sanic", "litestar"}
WEB_RUST_DEPS = {"actix-web", "axum", "rocket", "warp", "poem", "tide"}
CLI_RUBY_DEPS = {"thor", "gli", "dry-cli"}
CLI_PYTHON_DEPS = {"click", "typer", "argparse"}
def _read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except (OSError, IOError):
return None
def _has_any_dep(pkg_json, dep_names):
deps = set(pkg_json.get("dependencies", {}).keys())
dev_deps = set(pkg_json.get("devDependencies", {}).keys())
all_deps = deps | dev_deps
return bool(all_deps & dep_names)
def _detect_project_type(repo_root):
root = Path(repo_root)
# Try package.json first (used by multiple checks)
pkg_json = None
pkg_text = _read_file(root / "package.json")
if pkg_text:
try:
pkg_json = json.loads(pkg_text)
except json.JSONDecodeError:
pass
# 1. Desktop app (Electron)
if pkg_json and _has_any_dep(pkg_json, ELECTRON_DEPS):
return {"type": "desktop-app", "reason": "package.json contains Electron dependency"}
# 2. Web app
if pkg_json and _has_any_dep(pkg_json, WEB_NODE_DEPS):
return {"type": "web-app", "reason": "package.json contains web framework dependency"}
# Check vite with framework deps (vite alone could be anything)
if pkg_json and _has_any_dep(pkg_json, {"vite"}):
all_deps = set(pkg_json.get("dependencies", {}).keys()) | set(pkg_json.get("devDependencies", {}).keys())
if all_deps & WEB_NODE_DEPS:
return {"type": "web-app", "reason": "package.json contains vite with framework dependency"}
gemfile = _read_file(root / "Gemfile")
if gemfile:
for dep in WEB_RUBY_DEPS:
if dep in gemfile:
return {"type": "web-app", "reason": f"Gemfile contains {dep}"}
go_mod = _read_file(root / "go.mod")
if go_mod:
for dep in WEB_GO_DEPS:
if dep in go_mod:
return {"type": "web-app", "reason": f"go.mod contains {dep}"}
for pyfile in ["pyproject.toml", "requirements.txt"]:
content = _read_file(root / pyfile)
if content:
for dep in WEB_PYTHON_DEPS:
if dep in content:
return {"type": "web-app", "reason": f"{pyfile} contains {dep}"}
cargo = _read_file(root / "Cargo.toml")
if cargo:
for dep in WEB_RUST_DEPS:
if dep in cargo:
return {"type": "web-app", "reason": f"Cargo.toml contains {dep}"}
# 3. CLI tool
if pkg_json:
if "bin" in pkg_json:
return {"type": "cli-tool", "reason": "package.json has bin field"}
if (root / "bin").is_dir():
return {"type": "cli-tool", "reason": "bin/ directory exists"}
if go_mod and (root / "cmd").is_dir():
return {"type": "cli-tool", "reason": "go.mod with cmd/ directory"}
if cargo and "[[bin]]" in cargo:
return {"type": "cli-tool", "reason": "Cargo.toml has [[bin]] section"}
pyproject = _read_file(root / "pyproject.toml")
if pyproject:
if "[project.scripts]" in pyproject or "[tool.poetry.scripts]" in pyproject:
return {"type": "cli-tool", "reason": "pyproject.toml has script entry points"}
for dep in CLI_PYTHON_DEPS:
if dep in pyproject:
return {"type": "cli-tool", "reason": f"pyproject.toml contains {dep}"}
if gemfile:
for dep in CLI_RUBY_DEPS:
if dep in gemfile:
return {"type": "cli-tool", "reason": f"Gemfile contains {dep}"}
if (root / "bin").is_dir() or (root / "exe").is_dir():
return {"type": "cli-tool", "reason": "Ruby project with bin/ or exe/ directory"}
if go_mod and (root / "main.go").exists():
return {"type": "cli-tool", "reason": "main.go exists without web framework"}
# 4. Library
manifests = ["package.json", "Gemfile", "go.mod", "Cargo.toml", "pyproject.toml", "setup.py"]
has_manifest = any((root / m).exists() for m in manifests)
if not has_manifest:
# Check for gemspec
has_manifest = bool(list(root.glob("*.gemspec")))
if has_manifest:
return {"type": "library", "reason": "package manifest exists but no web/CLI signals"}
# 5. Text-only
return {"type": "text-only", "reason": "no recognized package manifest"}
def cmd_detect(args):
repo_root = args.repo_root or os.getcwd()
result = _detect_project_type(repo_root)
print(json.dumps(result))
# --- Recommend ---
def _recommend_tier(project_type, change_type, tools):
has_browser = tools.get("agent_browser", False)
has_vhs = tools.get("vhs", False)
has_silicon = tools.get("silicon", False)
has_ffmpeg = tools.get("ffmpeg", False)
has_ffprobe = tools.get("ffprobe", False)
has_stitch = has_ffmpeg and has_ffprobe # stitching requires both
recommended = None
reasoning = ""
if project_type == "web-app":
if has_browser and has_stitch:
recommended = "browser-reel"
reasoning = "Web app with agent-browser and ffmpeg available"
elif has_browser:
recommended = "static-screenshots"
reasoning = "Web app with agent-browser but no ffmpeg/ffprobe for stitching"
else:
recommended = "static-screenshots"
reasoning = "Web app without agent-browser"
elif project_type == "cli-tool":
if change_type == "motion":
if has_vhs:
recommended = "terminal-recording"
reasoning = "CLI tool with motion, VHS available"
elif has_silicon and has_stitch:
recommended = "screenshot-reel"
reasoning = "CLI tool with motion, silicon + ffmpeg available (no VHS)"
else:
recommended = "static-screenshots"
reasoning = "CLI tool with no capture tools available"
else: # states
if has_silicon and has_stitch:
recommended = "screenshot-reel"
reasoning = "CLI tool with discrete states, silicon + ffmpeg available"
elif has_vhs:
recommended = "terminal-recording"
reasoning = "CLI tool with discrete states, VHS available (no silicon)"
else:
recommended = "static-screenshots"
reasoning = "CLI tool with no capture tools available"
elif project_type == "desktop-app":
if has_browser and has_stitch:
recommended = "browser-reel"
reasoning = "Desktop app with agent-browser and ffmpeg (via localhost/CDP)"
else:
recommended = "static-screenshots"
reasoning = "Desktop app without agent-browser"
elif project_type == "library":
recommended = "static-screenshots"
reasoning = "Library projects use static screenshots"
else: # text-only or unknown
recommended = "static-screenshots"
reasoning = "Fallback to static screenshots"
# Build available tiers list
available = []
if has_browser and has_stitch:
available.append("browser-reel")
if has_vhs:
available.append("terminal-recording")
if has_silicon and has_stitch:
available.append("screenshot-reel")
available.append("static-screenshots") # always available
return {
"recommended": recommended,
"available": available,
"reasoning": reasoning,
}
def cmd_recommend(args):
try:
tools = json.loads(args.tools)
except json.JSONDecodeError:
die("--tools must be valid JSON")
result = _recommend_tier(args.project_type, args.change_type, tools)
print(json.dumps(result))
# --- Stitch ---
def _get_frame_dimensions(path):
result = run_cmd([
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "csv=p=0", str(path),
])
if result.returncode != 0:
die(f"ffprobe failed on {path}")
parts = result.stdout.strip().split(",")
return int(parts[0]), int(parts[1])
def _stitch_frames(output, frames, duration=3.0):
if not frames:
die("No input frames provided")
for f in frames:
if not Path(f).exists():
die(f"Frame not found: {f}")
if not check_tool("ffmpeg"):
die("ffmpeg is not installed. Install with: brew install ffmpeg")
if not check_tool("ffprobe"):
die("ffprobe is not installed. Install with: brew install ffmpeg")
print(f"Stitching {len(frames)} frames into GIF ({duration}s per frame)...")
tmpdir = tempfile.mkdtemp(prefix="evidence-stitch-")
try:
# Detect max dimensions
max_w, max_h = 0, 0
for f in frames:
w, h = _get_frame_dimensions(f)
max_w = max(max_w, w)
max_h = max(max_h, h)
# Even dimensions
if max_w % 2 != 0:
max_w += 1
if max_h % 2 != 0:
max_h += 1
print(f" Target dimensions: {max_w}x{max_h}")
# Normalize frames
normalized = []
for i, f in enumerate(frames):
out = os.path.join(tmpdir, f"frame_{i:03d}.png")
result = run_cmd([
"ffmpeg", "-y", "-v", "error", "-i", f,
"-vf", f"scale={max_w}:{max_h}:force_original_aspect_ratio=decrease,"
f"pad={max_w}:{max_h}:(ow-iw)/2:0:color=#0d1117",
out,
])
if result.returncode != 0:
die(f"ffmpeg failed to normalize frame: {f}")
normalized.append(out)
print(f" Normalized {len(normalized)} frames")
# Write concat file
concat_file = os.path.join(tmpdir, "concat.txt")
with open(concat_file, "w") as fh:
for f in normalized:
fh.write(f"file '{os.path.basename(f)}'\n")
fh.write(f"duration {duration}\n")
# Last file repeated without duration (concat demuxer requirement)
fh.write(f"file '{os.path.basename(normalized[-1])}'\n")
# Two-pass palette generation
palette = os.path.join(tmpdir, "palette.png")
result = run_cmd([
"ffmpeg", "-y", "-v", "error",
"-f", "concat", "-safe", "0", "-i", concat_file,
"-vf", "palettegen=stats_mode=diff",
palette,
])
if result.returncode != 0:
die("ffmpeg palette generation failed")
# Generate GIF with palette
result = run_cmd([
"ffmpeg", "-y", "-v", "error",
"-f", "concat", "-safe", "0", "-i", concat_file,
"-i", palette,
"-lavfi", "paletteuse=dither=bayer:bayer_scale=3",
"-loop", "0",
output,
])
if result.returncode != 0:
die("ffmpeg GIF encoding failed")
if not Path(output).exists():
die("GIF creation failed: no output file")
size = Path(output).stat().st_size
size_mb = size / (1024 * 1024)
print(f" Created: {output} ({size_mb:.1f} MB, {len(frames)} frames)")
# Auto-reduce if over limit
if size > MAX_GIF_SIZE:
print(" GIF exceeds 10 MB limit. Reducing...")
if len(frames) > 2:
print(" Dropping middle frame(s) and re-stitching...")
reduced = [frames[0]]
step = max(2, (len(frames) - 1) // 2)
for j in range(step, len(frames) - 1, step):
reduced.append(frames[j])
reduced.append(frames[-1])
if len(reduced) < len(frames):
print(f" Reduced from {len(frames)} to {len(reduced)} frames")
shutil.rmtree(tmpdir, ignore_errors=True)
_stitch_frames(output, reduced, duration)
return
print(" WARNING: Could not reduce below 10 MB. GIF may not render inline on GitHub.")
elif size > TARGET_GIF_SIZE:
print(" Note: GIF is over 5 MB preferred target but under 10 MB limit. Acceptable.")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def cmd_stitch(args):
_stitch_frames(args.output, args.frames, args.duration)
# --- Screenshot Reel ---
def cmd_screenshot_reel(args):
if not check_tool("silicon"):
die("silicon is not installed. Install with: brew install silicon")
if not check_tool("ffmpeg"):
die("ffmpeg is not installed. Install with: brew install ffmpeg")
tmpdir = tempfile.mkdtemp(prefix="evidence-reel-")
try:
frame_pngs = []
for i, text_file in enumerate(args.text):
if not Path(text_file).exists():
die(f"Text file not found: {text_file}")
out_png = os.path.join(tmpdir, f"frame_{i:03d}.png")
result = run_cmd([
"silicon", text_file,
"-o", out_png,
"--theme", args.theme,
"-l", args.lang,
"--pad-horiz", "20",
"--pad-vert", "40",
"--no-line-number",
"--no-round-corner",
"--background", args.background,
])
if result.returncode != 0 or not Path(out_png).exists():
die(f"silicon failed to render {text_file}")
frame_pngs.append(out_png)
print(f"Rendered {len(frame_pngs)} frames via silicon")
_stitch_frames(args.output, frame_pngs, args.duration)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# --- Terminal Recording ---
def cmd_terminal_recording(args):
if not check_tool("vhs"):
die("vhs is not installed. Install with: brew install charmbracelet/tap/vhs")
tape_path = args.tape
if not Path(tape_path).exists():
die(f"Tape file not found: {tape_path}")
# Parse Output directive from tape file
output_path = args.output
tape_content = Path(tape_path).read_text()
tape_has_output = False
for line in tape_content.splitlines():
stripped = line.strip()
if stripped.startswith("Output "):
tape_has_output = True
if not output_path:
output_path = stripped.split(None, 1)[1].strip().strip('"').strip("'")
break
if not output_path:
die("No output path: use --output or set Output in the tape file")
# If --output differs from tape's Output directive, rewrite to a temp tape
actual_tape = tape_path
tmp_tape = None
if output_path and tape_has_output:
# Rewrite the Output line to use the requested path
lines = tape_content.splitlines()
rewritten = []
for line in lines:
if line.strip().startswith("Output "):
rewritten.append(f'Output "{output_path}"')
else:
rewritten.append(line)
fd, tmp_tape = tempfile.mkstemp(suffix=".tape", prefix="vhs-")
os.close(fd)
Path(tmp_tape).write_text("\n".join(rewritten) + "\n")
actual_tape = tmp_tape
elif output_path and not tape_has_output:
# No Output in tape — prepend one
fd, tmp_tape = tempfile.mkstemp(suffix=".tape", prefix="vhs-")
os.close(fd)
Path(tmp_tape).write_text(f'Output "{output_path}"\n{tape_content}')
actual_tape = tmp_tape
print(f"Running VHS tape: {tape_path}")
result = run_cmd(["vhs", actual_tape], timeout=300)
if tmp_tape and Path(tmp_tape).exists():
Path(tmp_tape).unlink()
if result.returncode != 0:
die(f"VHS failed (exit {result.returncode})")
if not Path(output_path).exists():
die(f"VHS produced no output at {output_path}")
size = Path(output_path).stat().st_size
size_mb = size / (1024 * 1024)
print(f"Recording: {output_path} ({size_mb:.1f} MB)")
print(json.dumps({"gif_path": str(output_path), "size_mb": round(size_mb, 1)}))
# --- Upload ---
def cmd_upload(args):
file_path = args.file
if not Path(file_path).exists():
die(f"File not found: {file_path}")
if not check_tool("curl"):
die("curl is not installed")
size_mb = file_size_mb(file_path)
print(f"Uploading {file_path} ({size_mb:.1f} MB) to catbox.moe...")
def _try_upload():
try:
result = subprocess.run(
["curl", "-s", "--connect-timeout", "10",
"-F", "reqtype=fileupload",
"-F", f"fileToUpload=@{file_path}", CATBOX_API],
capture_output=True, text=True, timeout=30, check=False,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
print("ERROR: Upload timed out after 30s", file=sys.stderr)
return ""
url = _try_upload()
if url.startswith("https://"):
print(f"Uploaded: {url}")
print(url)
return
print(f"ERROR: Upload failed. Response: {url[:200]}", file=sys.stderr)
print(f"Local file preserved at: {file_path}", file=sys.stderr)
# Retry once
print("Retrying in 2 seconds...", file=sys.stderr)
time.sleep(2)
url = _try_upload()
if url.startswith("https://"):
print(f"Uploaded (retry): {url}")
print(url)
else:
print("ERROR: Retry also failed. Upload manually or commit to branch.", file=sys.stderr)
sys.exit(1)
# --- Main ---
def main():
parser = argparse.ArgumentParser(
description="Evidence capture pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Commands:
preflight Check tool availability (JSON)
detect [--repo-root PATH] Detect project type (JSON)
recommend --project-type T ... Recommend capture tier (JSON)
stitch [--duration N] OUTPUT FRAMES Stitch frames into animated GIF
screenshot-reel --output O --text F Render text via silicon + stitch
terminal-recording --output O --tape T Run VHS tape
upload FILE Upload to catbox.moe
""",
)
sub = parser.add_subparsers(dest="command")
# preflight
sub.add_parser("preflight", help="Check tool availability")
# detect
p_detect = sub.add_parser("detect", help="Detect project type")
p_detect.add_argument("--repo-root", help="Repository root (default: cwd)")
# recommend
p_rec = sub.add_parser("recommend", help="Recommend capture tier")
p_rec.add_argument("--project-type", required=True,
choices=["web-app", "cli-tool", "library", "desktop-app", "text-only"])
p_rec.add_argument("--change-type", required=True, choices=["motion", "states"])
p_rec.add_argument("--tools", required=True, help="JSON object of tool availability")
# stitch
p_stitch = sub.add_parser("stitch", help="Stitch frames into animated GIF")
p_stitch.add_argument("--duration", type=float, default=3.0, help="Seconds per frame")
p_stitch.add_argument("output", help="Output GIF path")
p_stitch.add_argument("frames", nargs="+", help="Input frame PNGs")
# screenshot-reel
p_reel = sub.add_parser("screenshot-reel", help="Render text frames via silicon + stitch")
p_reel.add_argument("--output", required=True, help="Output GIF path")
p_reel.add_argument("--duration", type=float, default=2.5, help="Seconds per frame")
p_reel.add_argument("--lang", default="bash", help="Language for syntax highlighting")
p_reel.add_argument("--theme", default="Dracula", help="Silicon theme")
p_reel.add_argument("--background", default="#0d1117", help="Background color for frame border")
p_reel.add_argument("--text", nargs="+", required=True, help="Text files (one per frame)")
# terminal-recording
p_term = sub.add_parser("terminal-recording", help="Run VHS tape file")
p_term.add_argument("--output", help="Output GIF path (overrides tape Output directive)")
p_term.add_argument("--tape", required=True, help="VHS tape file path")
# upload
p_upload = sub.add_parser("upload", help="Upload to catbox.moe")
p_upload.add_argument("file", help="File to upload")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
dispatch = {
"preflight": cmd_preflight,
"detect": cmd_detect,
"recommend": cmd_recommend,
"stitch": cmd_stitch,
"screenshot-reel": cmd_screenshot_reel,
"terminal-recording": cmd_terminal_recording,
"upload": cmd_upload,
}
dispatch[args.command](args)
if __name__ == "__main__":
main()