#!/usr/bin/env python3 """Undeploy all apps from a test server except protected infrastructure and (optionally) a specific recipe and its dependencies. This frees memory so you can test one recipe at a time. Usage: python3 scripts/context_reset.py # undeploy everything except infra python3 scripts/context_reset.py --recipe hedgedoc # keep hedgedoc + its deps python3 scripts/context_reset.py --recipe lasuite-docs # keep lasuite-docs + keycloak """ import argparse import json import os import sys from pathlib import Path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from lib.models import load_instance, load_default_instance, load_recipe from lib.abra import app_ls, app_undeploy from lib.log import SkillLogger def load_in_use_recipes() -> set[str]: """Return recipes currently locked by other agents via /workspace/in-use/.lock files.""" in_use_dir = Path(__file__).resolve().parent.parent / "in-use" if not in_use_dir.is_dir(): return set() return {p.stem for p in in_use_dir.glob("*.lock")} def resolve_dependencies(recipe_name: str, seen: set | None = None) -> set[str]: """Recursively resolve all dependencies for a recipe.""" if seen is None: seen = set() if recipe_name in seen: return seen seen.add(recipe_name) recipe = load_recipe(recipe_name) for dep in recipe.dependencies: resolve_dependencies(dep, seen) return seen def main(): parser = argparse.ArgumentParser( description="Undeploy non-protected apps from a test server" ) parser.add_argument("--recipe", default=None, help="Recipe to keep (with its dependencies)") parser.add_argument("--instance", default=None, help="Instance name (default: active)") args = parser.parse_args() log = SkillLogger("context-reset", args.recipe) # Resolve instance if args.instance: inst = load_instance(args.instance) else: inst = load_default_instance() log.info(f"Instance: {inst.name} ({inst.server})") # Build protected set (recipe names matched by recipe field) protected_recipes = set(inst.protected_recipes) # Protect any recipes currently locked by other agents in_use = load_in_use_recipes() if in_use: log.info(f"In-use lockfiles found, protecting: {', '.join(sorted(in_use))}") protected_recipes.update(in_use) # Protected domains matched by exact appName (for test_requires) protected_domains = set() if args.recipe: deps = resolve_dependencies(args.recipe) protected_recipes.update(deps) # Also include test_requires for the target recipe (not transitive). # test_requires are domain prefixes, expanded to full domains. target = load_recipe(args.recipe) for tr in target.test_requires: domain = f"{tr}.{inst.domain_suffix}" protected_domains.add(domain) log.step("Protected recipes") for r in sorted(protected_recipes): reason = "infrastructure" if r in inst.protected_recipes else "target/dependency" log.info(f" {r} ({reason})") for d in sorted(protected_domains): log.info(f" {d} (test dependency)") # List deployed apps log.step("List deployed apps") result = app_ls(inst.server) if not result.ok: log.error(f"Failed to list apps: {result.stderr}") log.save() sys.exit(1) # Parse the output — abra app ls -S -m outputs JSON: # {"server": {"apps": [...], "appCount": N, ...}} raw = result.json() apps_data = [] if isinstance(raw, dict): for server_name, server_data in raw.items(): if isinstance(server_data, dict) and "apps" in server_data: for app in server_data["apps"]: if isinstance(app, dict): apps_data.append(app) elif isinstance(server_data, list): apps_data.extend(server_data) elif isinstance(raw, list): apps_data = raw if not apps_data: log.info("No apps deployed") log.save() return # Categorize apps to_keep = [] to_undeploy = [] for app in apps_data: if isinstance(app, dict): domain = app.get("domain", "") recipe = app.get("recipe", "") app_name = app.get("appName", domain) else: domain = str(app) recipe = "" app_name = domain if not app_name: continue # Check if the app's recipe matches any protected recipe, # or if its domain matches a protected domain is_protected = False for pr in protected_recipes: if recipe == pr or pr in app_name: is_protected = True break if not is_protected and domain in protected_domains: is_protected = True if is_protected: to_keep.append(app_name) else: to_undeploy.append(app_name) log.step("Plan") log.info(f"Keeping {len(to_keep)} app(s)") for d in to_keep: log.info(f" KEEP: {d}") log.info(f"Undeploying {len(to_undeploy)} app(s)") for d in to_undeploy: log.info(f" UNDEPLOY: {d}") if not to_undeploy: log.info("Nothing to undeploy") log.save() return # Undeploy log.step("Undeploy") for domain in to_undeploy: log.info(f"Undeploying {domain} ...") result = app_undeploy(domain, timeout=60) log.command(result.command, result.stdout, result.returncode) log.step("Done") log.info(f"Undeployed {len(to_undeploy)} app(s)") log.save() if __name__ == "__main__": main()