abra/bin/app-json.py

284 lines
8.2 KiB
Python
Executable File

#!/usr/bin/env python3
# Usage: ./app-json.py
#
# Gather metadata from Co-op Cloud apps in $ABRA_DIR/apps (default
# ~/.abra/apps), and format it as JSON so that it can be hosted here:
# https://apps.coopcloud.tech
from json import dump
from logging import DEBUG, basicConfig, getLogger
from os import chdir, listdir, mkdir
from os.path import basename, exists, expanduser
from pathlib import Path
from re import findall, search
from shlex import split
from subprocess import DEVNULL, check_output
from sys import exit
from requests import get
HOME_PATH = expanduser("~/")
CLONES_PATH = Path(f"{HOME_PATH}/.abra/apps").absolute()
YQ_PATH = Path(f"{HOME_PATH}/.abra/vendor/yq")
SCRIPT_PATH = Path(__file__).absolute().parent
REPOS_TO_SKIP = (
"abra",
"abra-apps",
"backup-bot",
"cloud.autonomic.zone",
"docs.cloud.autonomic.zone",
"example",
"organising",
"pyabra",
"stack-ssh-deploy",
"radicle-seed-node",
"coturn"
)
log = getLogger(__name__)
basicConfig()
log.setLevel(DEBUG)
def _run_cmd(cmd, shell=False, **kwargs):
"""Run a shell command."""
args = [split(cmd)]
if shell:
args = [cmd]
kwargs = {"shell": shell}
try:
return check_output(*args, **kwargs).decode("utf-8").strip()
except Exception as exception:
log.error(f"Failed to run {cmd}, saw {str(exception)}")
exit(1)
def get_repos_json():
""" Retrieve repo list from Gitea """
url = "https://git.autonomic.zone/api/v1/orgs/coop-cloud/repos"
log.info(f"Retrieving {url}")
repos = []
response = True
page = 1
try:
while response:
log.info(f"Trying to fetch page {page}")
response = get(url + f"?page={page}",timeout=10).json()
repos.extend(response)
page += 1
return repos
except Exception as exception:
log.error(f"Failed to retrieve {url}, saw {str(exception)}")
exit(1)
def get_published_apps_json():
"""Retrieve already published apps json."""
url = "https://apps.coopcloud.tech"
log.info(f"Retrieving {url}")
try:
return get(url, timeout=5).json()
except Exception as exception:
log.error(f"Failed to retrieve {url}, saw {str(exception)}")
return {}
def clone_all_apps(repos_json):
"""Clone all Co-op Cloud apps to ~/.abra/apps."""
if not exists(CLONES_PATH):
mkdir(CLONES_PATH)
repos = [[p["name"], p["ssh_url"]] for p in repos_json]
for name, url in repos:
continue
if name in REPOS_TO_SKIP:
continue
if not exists(f"{CLONES_PATH}/{name}"):
log.info(f"Retrieving {url}")
_run_cmd(f"git clone {url} {CLONES_PATH}/{name}")
chdir(f"{CLONES_PATH}/{name}")
if not int(_run_cmd("git branch --list | wc -l", shell=True)):
log.info(f"Guessing main branch is HEAD for {name}")
_run_cmd("git checkout main")
else:
log.info(f"Updating {name}")
chdir(f"{CLONES_PATH}/{name}")
_run_cmd("git fetch -a")
def generate_apps_json(repos_json):
"""Generate the abra-apps.json application versions file."""
apps_json = {}
cached_apps_json = get_published_apps_json()
for app in listdir(CLONES_PATH):
if app in REPOS_TO_SKIP:
log.info(f"Skipping {app}")
continue
repo_details = next(filter(lambda x: x['name'] == app, repos_json), {})
app_path = f"{CLONES_PATH}/{app}"
chdir(app_path)
metadata = get_app_metadata(app_path)
name = metadata.pop("name", "")
log.info(f"Processing {app}")
apps_json[app] = {
"name": name,
"category": metadata.get("category", ""),
"repository": repo_details.get("clone_url", ""),
"default_branch": repo_details.get("default_branch", ""),
"description": repo_details.get("description", ""),
"website": repo_details.get("website", ""),
# Note(decentral1se): please note that the app metadata do not
# correspond to version tags. We simply parse the latest metadata
# list from HEAD. This may lead to unexpected situations where
# users believe X feature is available under Y version but it is
# not.
"features": metadata,
"versions": get_app_versions(app_path, cached_apps_json),
"icon": repo_details.get("avatar_url", "")
}
return apps_json
def get_app_metadata(app_path):
"""Parse metadata from app repo README files."""
metadata = {}
chdir(app_path)
with open(f"{app_path}/README.md", "r") as handle:
log.info(f"{app_path}/README.md")
contents = handle.read()
try:
for match in findall(r"\*\*.*\s\*", contents):
title = search(r"(?<=\*\*).*(?=\*\*)", match).group().lower()
if title == "image":
value = {
"image": search(r"(?<=`).*(?=`)", match).group(),
"url": search(r"(?<=\().*(?=\))", match).group(),
"rating": match.split(",")[1].strip(),
"source": match.split(",")[-1].replace("*", "").strip(),
}
else:
value = match.split(":")[-1].replace("*", "").strip()
metadata[title] = value
metadata["name"] = findall(r"^# (.*)", contents)[0]
except (IndexError, AttributeError):
log.info(f"Can't parse {app_path}/README.md")
return {}
finally:
_run_cmd("git checkout HEAD")
log.info(f"Parsed {metadata}")
return metadata
def get_app_versions(app_path, cached_apps_json):
versions = {}
chdir(app_path)
tags = _run_cmd("git tag --list").split()
if not tags:
log.info("No tags discovered, moving on")
return {}
initial_branch = _run_cmd("git rev-parse --abbrev-ref HEAD")
app_name = basename(app_path)
try:
existing_tags = cached_apps_json[app_name]["versions"].keys()
except KeyError:
existing_tags = []
for tag in tags:
_run_cmd(f"git checkout {tag}",
stderr=DEVNULL)
services_cmd = f"{YQ_PATH} e '.services | keys | .[]' compose*.yml"
services = _run_cmd(services_cmd, shell=True).split()
parsed_services = []
service_versions = {}
for service in services:
if service in ("null", "---"):
continue
if tag in existing_tags and service in cached_apps_json[app_name]["versions"][tag]:
log.info(f"Skipping {tag} because we've already processed it")
existing_versions = cached_apps_json[app_name]["versions"][tag][service]
service_versions[service] = existing_versions
_run_cmd(f"git checkout {initial_branch}")
continue
if service in parsed_services:
log.info(f"Skipped {service}, we've already parsed it locally")
continue
services_cmd = f"{YQ_PATH} e '.services.{service}.image' compose*.yml"
images = _run_cmd(services_cmd, shell=True).split()
for image in images:
if image in ("null", "---"):
continue
images_cmd = f"skopeo inspect docker://{image} | jq '.Digest'"
output = _run_cmd(images_cmd, shell=True)
service_version_info = {
"image": image.split(":")[0],
"tag": image.split(":")[-1],
"digest": output.split(":")[-1][:8],
}
log.info(f"Parsed {service_version_info}")
service_versions[service] = service_version_info
parsed_services.append(service)
versions[tag] = service_versions
_run_cmd(f"git checkout {initial_branch}")
return versions
def main():
"""Run the script."""
repos_json = get_repos_json()
clone_all_apps(repos_json)
target = f"{SCRIPT_PATH}/../deploy/apps.coopcloud.tech/apps.json"
with open(target, "w", encoding="utf-8") as handle:
dump(generate_apps_json(repos_json), handle, ensure_ascii=False, indent=4)
log.info(f"Successfully generated {target}")
main()