#!/usr/bin/env python3 # Usage: ./app-json.py # # Gather metadata from Co-op Cloud apps in $ABRA_DIR/apps (default # ~/.abra/apps), and format it as JSON so that it can be hosted here: # https://apps.coopcloud.tech from json import dump from logging import DEBUG, basicConfig, getLogger from os import chdir, listdir, mkdir from os.path import basename, exists, expanduser from pathlib import Path from re import findall, search from shlex import split from subprocess import DEVNULL, check_output from sys import exit from requests import get HOME_PATH = expanduser("~/") CLONES_PATH = Path(f"{HOME_PATH}/.abra/apps").absolute() YQ_PATH = Path(f"{HOME_PATH}/.abra/vendor/yq") SCRIPT_PATH = Path(__file__).absolute().parent REPOS_TO_SKIP = ( "abra", "abra-apps", "backup-bot", "coopcloud.tech", "coturn", "docs.coopcloud.tech", "example", "organising", "pyabra", "radicle-seed-node", "stack-ssh-deploy", ) log = getLogger(__name__) basicConfig() log.setLevel(DEBUG) def _run_cmd(cmd, shell=False, **kwargs): """Run a shell command.""" args = [split(cmd)] if shell: args = [cmd] kwargs = {"shell": shell} try: return check_output(*args, **kwargs).decode("utf-8").strip() except Exception as exception: log.error(f"Failed to run {cmd}, saw {str(exception)}") exit(1) def get_repos_json(): """ Retrieve repo list from Gitea """ url = "https://git.autonomic.zone/api/v1/orgs/coop-cloud/repos" log.info(f"Retrieving {url}") repos = [] response = True page = 1 try: while response: log.info(f"Trying to fetch page {page}") response = get(url + f"?page={page}", timeout=10).json() repos.extend(response) page += 1 return repos except Exception as exception: log.error(f"Failed to retrieve {url}, saw {str(exception)}") exit(1) def get_published_apps_json(): """Retrieve already published apps json.""" url = "https://apps.coopcloud.tech" log.info(f"Retrieving {url}") try: return get(url, timeout=5).json() except Exception as exception: log.error(f"Failed to retrieve {url}, saw {str(exception)}") return {} def clone_all_apps(repos_json): """Clone all Co-op Cloud apps to ~/.abra/apps.""" if not exists(CLONES_PATH): mkdir(CLONES_PATH) repos = [[p["name"], p["ssh_url"]] for p in repos_json] for name, url in repos: continue if name in REPOS_TO_SKIP: continue if not exists(f"{CLONES_PATH}/{name}"): log.info(f"Retrieving {url}") _run_cmd(f"git clone {url} {CLONES_PATH}/{name}") chdir(f"{CLONES_PATH}/{name}") if not int(_run_cmd("git branch --list | wc -l", shell=True)): log.info(f"Guessing main branch is HEAD for {name}") _run_cmd("git checkout main") else: log.info(f"Updating {name}") chdir(f"{CLONES_PATH}/{name}") _run_cmd("git fetch -a") def generate_apps_json(repos_json): """Generate the abra-apps.json application versions file.""" apps_json = {} cached_apps_json = get_published_apps_json() for app in listdir(CLONES_PATH): if app in REPOS_TO_SKIP: log.info(f"Skipping {app}") continue repo_details = next(filter(lambda x: x["name"] == app, repos_json), {}) app_path = f"{CLONES_PATH}/{app}" chdir(app_path) metadata = get_app_metadata(app_path) name = metadata.pop("name", "") log.info(f"Processing {app}") apps_json[app] = { "name": name, "category": metadata.get("category", ""), "repository": repo_details.get("clone_url", ""), "default_branch": repo_details.get("default_branch", ""), "description": repo_details.get("description", ""), "website": repo_details.get("website", ""), # Note(decentral1se): please note that the app metadata do not # correspond to version tags. We simply parse the latest metadata # list from HEAD. This may lead to unexpected situations where # users believe X feature is available under Y version but it is # not. "features": metadata, "versions": get_app_versions(app_path, cached_apps_json), "icon": repo_details.get("avatar_url", ""), } return apps_json def get_app_metadata(app_path): """Parse metadata from app repo README files.""" metadata = {} chdir(app_path) with open(f"{app_path}/README.md", "r") as handle: log.info(f"{app_path}/README.md") contents = handle.read() try: for match in findall(r"\*\*.*\s\*", contents): title = search(r"(?<=\*\*).*(?=\*\*)", match).group().lower() if title == "image": value = { "image": search(r"(?<=`).*(?=`)", match).group(), "url": search(r"(?<=\().*(?=\))", match).group(), "rating": match.split(",")[1].strip(), "source": match.split(",")[-1].replace("*", "").strip(), } elif title == "status": value = { "βΆπŸ’š": 1, "β·πŸ’›": 2, "❸🍎": 3, "βΉπŸ’£": 4, "?": 5, "": 5 }[match.split(":")[-1].replace("*", "").strip()] else: value = match.split(":")[-1].replace("*", "").strip() metadata[title] = value metadata["name"] = findall(r"^# (.*)", contents)[0] except (IndexError, AttributeError): log.info(f"Can't parse {app_path}/README.md") return {} finally: _run_cmd("git checkout HEAD") log.info(f"Parsed {metadata}") return metadata def get_app_versions(app_path, cached_apps_json): versions = {} chdir(app_path) tags = _run_cmd("git tag --list").split() if not tags: log.info("No tags discovered, moving on") return {} initial_branch = _run_cmd("git rev-parse --abbrev-ref HEAD") app_name = basename(app_path) try: existing_tags = cached_apps_json[app_name]["versions"].keys() except KeyError: existing_tags = [] for tag in tags: _run_cmd(f"git checkout {tag}", stderr=DEVNULL) services_cmd = f"{YQ_PATH} e '.services | keys | .[]' compose*.yml" services = _run_cmd(services_cmd, shell=True).split() parsed_services = [] service_versions = {} for service in services: if service in ("null", "---"): continue if ( tag in existing_tags and service in cached_apps_json[app_name]["versions"][tag] ): log.info(f"Skipping {tag} because we've already processed it") existing_versions = cached_apps_json[app_name]["versions"][tag][service] service_versions[service] = existing_versions _run_cmd(f"git checkout {initial_branch}") continue if service in parsed_services: log.info(f"Skipped {service}, we've already parsed it locally") continue services_cmd = f"{YQ_PATH} e '.services.{service}.image' compose*.yml" images = _run_cmd(services_cmd, shell=True).split() for image in images: if image in ("null", "---"): continue images_cmd = f"skopeo inspect docker://{image} | jq '.Digest'" output = _run_cmd(images_cmd, shell=True) service_version_info = { "image": image.split(":")[0], "tag": image.split(":")[-1], "digest": output.split(":")[-1][:8], } log.info(f"Parsed {service_version_info}") service_versions[service] = service_version_info parsed_services.append(service) versions[tag] = service_versions _run_cmd(f"git checkout {initial_branch}") return versions def main(): """Run the script.""" repos_json = get_repos_json() clone_all_apps(repos_json) target = f"{SCRIPT_PATH}/../deploy/apps.coopcloud.tech/apps.json" with open(target, "w", encoding="utf-8") as handle: dump(generate_apps_json(repos_json), handle, ensure_ascii=False, indent=4) log.info(f"Successfully generated {target}") main()