This repository has been archived on 2021-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
abra/bin/app-json.py

238 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
# Usage: ./app-json.py
#
# Gather metadata from Co-op Cloud apps in $ABRA_DIR/apps (default
# ~/.abra/apps), and format it as JSON so that it can be hosted here:
# https://apps.coopcloud.tech
import argparse
from json import dump
from os import chdir, environ, getcwd, listdir
from os.path import basename
from pathlib import Path
from re import findall, search
from subprocess import DEVNULL
from requests import get
from abralib import (
CLONES_PATH,
JQ_PATH,
REPOS_TO_SKIP,
YQ_PATH,
_run_cmd,
clone_all_apps,
get_repos_json,
log,
)
parser = argparse.ArgumentParser(description="Generate a new apps.json")
parser.add_argument("--output", type=Path, default=f"{getcwd()}/apps.json")
def skopeo_login():
"""Log into the docker registry to avoid rate limits."""
user = environ.get("SKOPEO_USER")
password = environ.get("SKOPEO_PASSWORD")
registry = environ.get("SKOPEO_REGISTRY", "docker.io")
if not user or not password:
log.info("Failed to log in via Skopeo due to missing env vars")
return
login_cmd = f"skopeo login {registry} -u {user} -p {password}"
output = _run_cmd(login_cmd, shell=True)
log.info(f"Skopeo login attempt: {output}")
def get_published_apps_json():
"""Retrieve already published apps json."""
url = "https://apps.coopcloud.tech"
log.info(f"Retrieving {url}")
try:
return get(url, timeout=5).json()
except Exception as exception:
log.error(f"Failed to retrieve {url}, saw {str(exception)}")
return {}
def generate_apps_json(repos_json):
"""Generate the abra-apps.json application versions file."""
apps_json = {}
cached_apps_json = get_published_apps_json()
for app in listdir(CLONES_PATH):
if app in REPOS_TO_SKIP:
log.info(f"Skipping {app}")
continue
repo_details = next(filter(lambda x: x["name"] == app, repos_json), {})
app_path = f"{CLONES_PATH}/{app}"
chdir(app_path)
metadata = get_app_metadata(app_path)
name = metadata.pop("name", app)
log.info(f"Processing {app}")
apps_json[app] = {
"name": name,
"category": metadata.get("category", ""),
"repository": repo_details.get("clone_url", ""),
"default_branch": repo_details.get("default_branch", ""),
"description": repo_details.get("description", ""),
"website": repo_details.get("website", ""),
"features": metadata,
"versions": get_app_versions(app_path, cached_apps_json),
"icon": repo_details.get("avatar_url", ""),
}
return apps_json
def get_app_metadata(app_path):
"""Parse metadata from app repo README files."""
metadata = {}
chdir(app_path)
try:
with open(f"{app_path}/README.md", "r") as handle:
log.info(f"{app_path}/README.md")
contents = handle.read()
except Exception:
log.info(f"No {app_path}/README.md discovered, moving on")
return {}
try:
for match in findall(r"\*\*.*", contents):
title = search(r"(?<=\*\*).*(?=\*\*)", match).group().lower()
if title == "image":
value = {
"image": search(r"(?<=`).*(?=`)", match).group(),
"url": search(r"(?<=\().*(?=\))", match).group(),
"rating": match.split(",")[1].strip(),
"source": match.split(",")[-1].replace("*", "").strip(),
}
elif title == "status":
value = {"❶💚": 1, "❷💛": 2, "❸🍎": 3, "❹💣": 4, "?": 5, "": 5}[
match.split(":")[-1].replace("*", "").strip()
]
else:
value = match.split(":")[-1].replace("*", "").strip()
metadata[title] = value
metadata["name"] = findall(r"^# (.*)", contents)[0]
except (IndexError, AttributeError):
log.info(f"Can't parse {app_path}/README.md")
return {}
finally:
_run_cmd("git checkout HEAD")
log.info(f"Parsed {metadata}")
return metadata
def get_app_versions(app_path, cached_apps_json):
versions = {}
chdir(app_path)
tags = _run_cmd("git tag --list").split()
if not tags:
log.info("No tags discovered, moving on")
return {}
initial_branch = _run_cmd("git rev-parse --abbrev-ref HEAD")
app_name = basename(app_path)
try:
existing_tags = cached_apps_json[app_name]["versions"].keys()
except KeyError:
existing_tags = []
for tag in tags:
_run_cmd(f"git checkout {tag}", stderr=DEVNULL)
services_cmd = f"{YQ_PATH} e '.services | keys | .[]' compose*.yml"
services = _run_cmd(services_cmd, shell=True).split()
parsed_services = []
service_versions = {}
for service in services:
if service in ("null", "---"):
continue
if (
tag in existing_tags
and service in cached_apps_json[app_name]["versions"][tag]
):
log.info(f"Skipping {tag} because we've already processed it")
existing_versions = cached_apps_json[app_name]["versions"][tag][service]
service_versions[service] = existing_versions
_run_cmd(f"git checkout {initial_branch}")
continue
if service in parsed_services:
log.info(f"Skipped {service}, we've already parsed it locally")
continue
services_cmd = f"{YQ_PATH} e '.services.{service}.image' compose*.yml"
images = _run_cmd(services_cmd, shell=True).split()
for image in images:
if image in ("null", "---"):
continue
images_cmd = f"skopeo inspect docker://{image} | {JQ_PATH} '.Digest'"
output = _run_cmd(images_cmd, shell=True)
service_version_info = {
"image": image.split(":")[0],
"tag": image.split(":")[-1],
"digest": output.split(":")[-1][:8],
}
log.info(f"Parsed {service_version_info}")
service_versions[service] = service_version_info
parsed_services.append(service)
versions[tag] = service_versions
_run_cmd(f"git checkout {initial_branch}")
return versions
def main():
"""Run the script."""
args = parser.parse_args()
skopeo_login()
repos_json = get_repos_json()
clone_all_apps(repos_json)
with open(args.output, "w", encoding="utf-8") as handle:
dump(
generate_apps_json(repos_json),
handle,
ensure_ascii=False,
indent=4,
sort_keys=True,
)
log.info(f"Successfully generated {args.output}")
main()