import subprocess import re from flask import current_app from time import sleep from os.path import join from subprocess import run from capsulflask.db import get_model def validate_capsul_id(id): if not re.match(r"^capsul-[a-z0-9]{10}$", id): raise ValueError(f"vm id \"{id}\" must match \"^capsul-[a-z0-9]{{10}}$\"") class VirtualMachine: def __init__(self, id, ipv4=None, ipv6=None): self.id = id self.ipv4 = ipv4 self.ipv6 = ipv6 class VirtualizationInterface: def get(self, id: str) -> VirtualMachine: pass def list_ids(self) -> list: pass def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list) -> VirtualMachine: pass def destroy(self, email: str, id: str): pass class MockVirtualization(VirtualizationInterface): def get(self, id): validate_capsul_id(id) return VirtualMachine(id, ipv4="1.1.1.1") def list_ids(self) -> list: return get_model().all_vm_ids() def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list): validate_capsul_id(id) print(f"mock create: {id} for {email}") sleep(5) return VirtualMachine(id, ipv4="1.1.1.1") def destroy(self, email: str, id: str): print(f"mock destroy: {id} for {email}") class ShellScriptVirtualization(VirtualizationInterface): def validate_completed_process(self, completedProcess, email=None): emailPart = "" if email != None: emailPart = f"for {email}" if completedProcess.returncode != 0: raise RuntimeError(f"""{" ".join(completedProcess.args)} failed {emailPart} with exit code {completedProcess.returncode} stdout: {completedProcess.stdout} stderr: {completedProcess.stderr} """) def get(self, id): validate_capsul_id(id) completedProcess = run([join(current_app.root_path, 'shell_scripts/get.sh'), id], capture_output=True) self.validate_completed_process(completedProcess) lines = completedProcess.stdout.splitlines() if not re.match(r"^([0-9]{1,3}\.){3}[0-9]{1,3}$", lines[0]): return None return VirtualMachine(id, ipv4=lines[0]) def list_ids(self) -> list: completedProcess = run([join(current_app.root_path, 'shell_scripts/list_ids.sh')], capture_output=True) self.validate_completed_process(completedProcess) return completedProcess.stdout.splitlines() def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list): validate_capsul_id(id) if not re.match(r"^[a-zA-Z0-9_.-]$", template_image_file_name): raise ValueError(f"template_image_file_name \"{template_image_file_name}\" must match \"^[a-zA-Z0-9_.-]$\"") for ssh_public_key in ssh_public_keys: if not re.match(r"^(ssh|ecdsa)-[0-9A-Za-z+/_=@ -]+$", ssh_public_key): raise ValueError(f"ssh_public_key \"{ssh_public_key}\" must match \"^(ssh|ecdsa)-[0-9A-Za-z+/_=@ -]+$\"") if vcpus < 1 or vcpus > 8: raise ValueError(f"vcpus \"{vcpus}\" must match 1 <= vcpus <= 8") if memory_mb < 512 or memory_mb > 16384: raise ValueError(f"memory_mb \"{memory_mb}\" must match 512 <= memory_mb <= 16384") ssh_keys_string = "\n".join(ssh_public_keys) completedProcess = run([ join(current_app.root_path, 'shell_scripts/create.sh'), id, template_image_file_name, str(vcpus), str(memory_mb), ssh_keys_string ], capture_output=True) self.validate_completed_process(completedProcess, email) lines = completedProcess.stdout.splitlines() vmSettings = f""" id={id} template_image_file_name={template_image_file_name} vcpus={str(vcpus)} memory={str(memory_mb)} ssh_public_keys={ssh_keys_string} """ if not lines[len(lines)-1] == "success": raise ValueError(f"""failed to create vm for {email} with: {vmSettings} stdout: {completedProcess.stdout} stderr: {completedProcess.stderr} """) for _ in range(0, 10): sleep(6) result = self.get(id) if result != None: return result for _ in range(0, 10): sleep(60) result = self.get(id) if result != None: return result raise TimeoutError(f"""timed out waiting for vm {id} ({email}) to obtain an IP address: {vmSettings} """) def destroy(self, email: str, id: str): validate_capsul_id(id) completedProcess = run([join(current_app.root_path, 'shell_scripts/destroy.sh'), id], capture_output=True) self.validate_completed_process(completedProcess, email) lines = completedProcess.stdout.splitlines() if not lines[len(lines)-1] == "success": raise ValueError(f"""failed to destroy vm "{id}" for {email}: stdout: {completedProcess.stdout} stderr: {completedProcess.stderr} """)