import re import sys from flask import Blueprint from flask import flash from flask import current_app from flask import g from flask import redirect from flask import url_for from flask import request from flask import session from flask import render_template from flask_mail import Message from werkzeug.exceptions import abort from nanoid import generate from capsulflask.auth import account_required from capsulflask.db import get_model, my_exec_info_message bp = Blueprint("console", __name__, url_prefix="/console") def makeCapsulId(): lettersAndNumbers = generate(alphabet="1234567890qwertyuiopasdfghjklzxcvbnm", size=10) return f"capsul-{lettersAndNumbers}" def double_check_capsul_address(db_model, email, id, ipv4): try: result = current_app.config["VIRTUALIZATION_MODEL"].get(id) if result.ipv4 != ipv4: ipv4 = result.ipv4 db_model.updateVm(email=email, id=id, ipv4=result.ipv4) except: print(f""" error occurred in list capsuls endpoint while trying to grab ip address of {vm['id']} via the virtualization model: {my_exec_info_message(sys.exc_info())}""" ) return ipv4 @bp.route("/") @account_required def index(): db_model = get_model() vms = db_model.list_vms_for_account(email=session["account"]) # for now we are going to check the IP according to the virt model # on every request. this could be done by a background job and cached later on... for vm in vms: vm["ipv4"] = double_check_capsul_address(db_model, session["account"], vm["id"], vm["ipv4"]) vms = list(map( lambda x: dict( id=x['id'], size=x['size'], ipv4=(x['ipv4'] if x['ipv4'] else "..booting.."), ipv4_status=("ok" if x['ipv4'] else "waiting-pulse"), os=x['os'], created=x['created'].strftime("%b %d %Y %H:%M") ), vms )) return render_template("capsuls.html", vms=vms, has_vms=len(vms) > 0) @bp.route("/") @account_required def detail(id): db_model = get_model() vm = db_model.get_vm_detail(email=session["account"], id=id) if vm is None: return abort(404, f"{id} doesn't exist.") vm["ipv4"] = double_check_capsul_address(db_model, session["account"], vm["id"], vm["ipv4"]) vm["created"] = vm['created'].strftime("%b %d %Y %H:%M") vm["ssh_public_keys"] = ", ".join(vm["ssh_public_keys"]) if len(vm["ssh_public_keys"]) > 0 else "" return render_template("capsul-detail.html", vm=vm) @bp.route("/create", methods=("GET", "POST")) @account_required def create(): db_model = get_model() vm_sizes = db_model.vm_sizes_dict() operating_systems = db_model.operating_systems_dict() ssh_public_keys = db_model.list_ssh_public_keys_for_account(session["account"]) errors = list() created_os = None if request.method == "POST": size = request.form["size"] os = request.form["os"] if not size: errors.append("Size is required") elif size not in vm_sizes: errors.append(f"Invalid size {size}") if not os: errors.append("OS is required") elif os not in operating_systems: errors.append(f"Invalid os {os}") posted_keys_count = int(request.form["ssh_public_key_count"]) posted_keys = list() if posted_keys_count > 1000: errors.append("something went wrong with ssh keys") else: for i in range(0, posted_keys_count): if f"ssh_key_{i}" in request.form: posted_name = request.form[f"ssh_key_{i}"] key = None for x in ssh_public_keys: if x['name'] == posted_name: key = x if key: posted_keys.append(key) else: errors.append(f"SSH Key \"{posted_name}\" doesn't exist") if len(posted_keys) == 0: errors.append("At least one SSH Public Key is required") if len(errors) == 0: id = makeCapsulId() db_model.create_vm( email=session["account"], id=id, size=size, os=os, ssh_public_keys=list(map(lambda x: x["name"], posted_keys)) ) current_app.config["VIRTUALIZATION_MODEL"].create( email = session["account"], id=id, template_image_file_name=operating_systems[os]['template_image_file_name'], vcpus=vm_sizes[size]['vcpus'], memory_mb=vm_sizes[size]['memory_mb'], ssh_public_keys=list(map(lambda x: x["content"], posted_keys)) ) created_os = os for error in errors: flash(error) return render_template( "create-capsul.html", created_os=created_os, ssh_public_keys=ssh_public_keys, ssh_public_key_count=len(ssh_public_keys), has_ssh_public_keys=len(ssh_public_keys) > 0, operating_systems=operating_systems, vm_sizes=vm_sizes ) @bp.route("/ssh", methods=("GET", "POST")) @account_required def ssh_public_keys(): db_model = get_model() errors = list() if request.method == "POST": method = request.form["method"] name = request.form["name"] if not name or len(name.strip()) < 1: errors.append("Name is required") elif not re.match(r"^[0-9A-Za-z_ -]+$", name): errors.append("Name must match \"^[0-9A-Za-z_ -]+$\"") if method == "POST": content = request.form["content"] if not content or len(content.strip()) < 1: errors.append("Content is required") else: content = content.replace("\r", "").replace("\n", "") if not re.match(r"^(ssh|ecdsa)-[0-9A-Za-z+/_=@. -]+$", content): errors.append("Content must match \"^(ssh|ecdsa)-[0-9A-Za-z+/_=@. -]+$\"") if db_model.ssh_public_key_name_exists(session["account"], name): errors.append("A key with that name already exists") if len(errors) == 0: db_model.create_ssh_public_key(session["account"], name, content) elif method == "DELETE": if len(errors) == 0: db_model.delete_ssh_public_key(session["account"], name) for error in errors: flash(error) keys_list=list(map( lambda x: dict(name=x['name'], content=f"{x['content'][:20]}...{x['content'][len(x['content'])-20:]}"), db_model.list_ssh_public_keys_for_account(session["account"]) )) return render_template("ssh-public-keys.html", ssh_public_keys=keys_list, has_ssh_public_keys=len(keys_list) > 0) @bp.route("/billing") @account_required def faq(): return render_template("billing.html")