import subprocess import re import sys import requests import json import asyncio from typing import List, Tuple import aiohttp from flask import current_app from time import sleep from os.path import join from subprocess import run from capsulflask.db import get_model from capsulflask.http_client import HTTPResult from capsulflask.shared import VirtualizationInterface, VirtualMachine, OnlineHost, validate_capsul_id, my_exec_info_message class MockHub(VirtualizationInterface): def __init__(self): self.default_network = "public1" self.default_ipv4 = "1.1.1.1" def capacity_avaliable(self, additional_ram_bytes): return True def get(self, id, get_ssh_host_keys): validate_capsul_id(id) if get_ssh_host_keys: ssh_host_keys = json.loads("""[ {"key_type":"ED25519", "content":"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIN8cna0zeKSKl/r8whdn/KmDWhdzuWRVV0GaKIM+eshh", "sha256":"V4X2apAF6btGAfS45gmpldknoDX0ipJ5c6DLfZR2ttQ"}, {"key_type":"RSA", "content":"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCvotgzgEP65JUQ8S8OoNKy1uEEPEAcFetSp7QpONe6hj4wPgyFNgVtdoWdNcU19dX3hpdse0G8OlaMUTnNVuRlbIZXuifXQ2jTtCFUA2mmJ5bF+XjGm3TXKMNGh9PN+wEPUeWd14vZL+QPUMev5LmA8cawPiU5+vVMLid93HRBj118aCJFQxLgrdP48VPfKHFRfCR6TIjg1ii3dH4acdJAvlmJ3GFB6ICT42EmBqskz2MPe0rIFxH8YohCBbAbrbWYcptHt4e48h4UdpZdYOhEdv89GrT8BF2C5cbQ5i9qVpI57bXKrj8hPZU5of48UHLSpXG8mbH0YDiOQOfKX/Mt", "sha256":"ghee6KzRnBJhND2kEUZSaouk7CD6o6z2aAc8GPkV+GQ"}, {"key_type":"ECDSA", "content":"ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBLLgOoATz9R4aS2kk7vWoxX+lshK63t9+5BIHdzZeFE1o+shlcf0Wji8cN/L1+m3bi0uSETZDOAWMP3rHLJj9Hk=", "sha256":"aCYG1aD8cv/TjzJL0bi9jdabMGksdkfa7R8dCGm1yYs"} ]""") return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4=self.default_ipv4, ssh_host_keys=ssh_host_keys) return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4=self.default_ipv4) def get_all_by_id(self) -> dict: by_host_and_network = get_model().non_deleted_vms_by_host_and_network(None) to_return = dict() for host in by_host_and_network.values(): for network in host.values(): for vm in network: vm['state'] = vm['desired_state'] to_return[vm['id']] = vm return to_return def create(self, email: str, id: str, os: str, size: str, shortterm: bool, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): validate_capsul_id(id) current_app.logger.info(f"mock create: {id} for {email}") sleep(1) get_model().create_vm( email=email, id=id, size=size, shortterm=shortterm, os=os, host=current_app.config["SPOKE_HOST_ID"], network_name=self.default_network, public_ipv4=self.default_ipv4, ssh_authorized_keys=list(map(lambda x: x["name"], ssh_authorized_keys)), ) def destroy(self, email: str, id: str): current_app.logger.info(f"mock destroy: {id} for {email}") def vm_state_command(self, email: str, id: str, command: str): current_app.logger.info(f"mock {command}: {id} for {email}") def net_set_dhcp(self, email: str, host_id: str, network_name: str, macs: list, remove_ipv4: str, add_ipv4: str): current_app.logger.info(f"mock net_set_dhcp: host_id={host_id} network_name={network_name} macs={','.join(macs)} remove_ipv4={remove_ipv4} add_ipv4={add_ipv4} for {email}") class CapsulFlaskHub(VirtualizationInterface): def synchronous_operation(self, hosts: List[OnlineHost], email: str, payload: str) -> List[HTTPResult]: return self.generic_operation(hosts, email, payload, True)[1] def asynchronous_operation(self, hosts: List[OnlineHost], email: str, payload: str) -> Tuple[int, List[HTTPResult]]: return self.generic_operation(hosts, email, payload, False) def generic_operation(self, hosts: List[OnlineHost], email: str, payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: url_path = "/spoke/operation" operation_id = None if not immediate_mode: if not email or email == "": raise ValueError("can't create_operation in the db cuz no email was provided") operation_id = get_model().create_operation(hosts, email, payload) url_path = f"/spoke/operation/{operation_id}" authorization_header = f"Bearer {current_app.config['HUB_TOKEN']}" results = current_app.config["HTTP_CLIENT"].do_multi_http_sync(hosts, url_path, payload, authorization_header=authorization_header) for i in range(len(hosts)): host = hosts[i] result = results[i] assignment_status = "pending" if result.status_code == -1: assignment_status = "no_response_from_host" if result.status_code != 200: assignment_status = "error_response_from_host" else: valid_statuses = { "assigned": True, "not_applicable": True, "assigned_to_other_host": True, } result_is_json = False result_is_dict = False result_has_status = False result_has_valid_status = False assignment_status = "invalid_response_from_host" error_message = "" try: result_body = json.loads(result.body) result_is_json = True result_is_dict = isinstance(result_body, dict) result_has_status = result_is_dict and 'assignment_status' in result_body result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses if result_has_valid_status: assignment_status = result_body['assignment_status'] if result_is_dict and "error_message" in result_body: error_message = result_body['error_message'] except: pass if not result_has_valid_status: operation_desc = "" if operation_id: operation_desc = f"for operation {operation_id}" current_app.logger.error(f"""error reading assignment_status {operation_desc} from host {host.id}: result_is_json: {result_is_json} result_is_dict: {result_is_dict} result_has_status: {result_has_status} result_has_valid_status: {result_has_valid_status} error_message: {error_message} """ ) if not immediate_mode: get_model().update_host_operation(host.id, operation_id, assignment_status, None) return (operation_id, results) def capacity_avaliable(self, additional_ram_bytes): online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes)) results = self.synchronous_operation(online_hosts, None, payload) for result in results: try: result_body = json.loads(result.body) if isinstance(result_body, dict) and 'capacity_avaliable' in result_body and result_body['capacity_avaliable'] == True: return True except: pass return False def get(self, id, get_ssh_host_keys) -> VirtualMachine: validate_capsul_id(id) host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="get", id=id, get_ssh_host_keys=get_ssh_host_keys)) results = self.synchronous_operation([host], None, payload) for result in results: try: result_body = json.loads(result.body) if isinstance(result_body, dict) and ('state' in result_body): return VirtualMachine(id, host=host, state=result_body['state'], ipv4=result_body['ipv4'], ipv6=result_body['ipv6'], ssh_host_keys=result_body['ssh_host_keys']) except: pass return None def get_all_by_id(self) -> dict: online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="get_all_by_id")) results = self.synchronous_operation(online_hosts, None, payload) to_return = dict() for i in range(len(results)): host = online_hosts[i] result = results[i] try: #current_app.logger.info(f"\n********5:\n{result.body}\n\n\n\n") result_body = json.loads(result.body) for id, vm in result_body['vms'].items(): vm['host'] = host.id to_return[id] = vm except: # no need to do anything here since if it cant be parsed then generic_operation will handle it. pass return to_return def create(self, email: str, id: str, os: str, size: str, shortterm: bool, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_authorized_keys: list): validate_capsul_id(id) online_hosts = get_model().get_online_hosts() #current_app.logger.debug(f"hub_model.create(): ${len(online_hosts)} hosts") payload = json.dumps(dict( type="create", email=email, id=id, os=os, size=size, shortterm=shortterm, template_image_file_name=template_image_file_name, vcpus=vcpus, memory_mb=memory_mb, ssh_authorized_keys=ssh_authorized_keys, )) op = self.asynchronous_operation(online_hosts, email, payload) operation_id = op[0] results = op[1] number_of_assigned = 0 error_message = "" assigned_hosts = [] for i in range(len(results)): host = online_hosts[i] result = results[i] try: result_body = json.loads(result.body) if isinstance(result_body, dict) and 'assignment_status' in result_body and result_body['assignment_status'] == "assigned": number_of_assigned += 1 assigned_hosts.append(host.id) if isinstance(result_body, dict) and 'error_message' in result_body: error_message = result_body['error_message'] except: # no need to do anything here since if it cant be parsed then generic_operation will handle it. pass assigned_hosts_string = ", ".join(assigned_hosts) if number_of_assigned != 1: raise ValueError(f"expected create capsul operation {operation_id} to be assigned to one host, it was assigned to {number_of_assigned} ({assigned_hosts_string})") if error_message != "": raise ValueError(f"create capsul operation {operation_id} on {assigned_hosts_string} failed with {error_message}") def destroy(self, email: str, id: str): validate_capsul_id(id) result_status = None host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="destroy", email=email, id=id)) results = self.synchronous_operation([host], email, payload) result_json_string = "" for result in results: try: result_json_string = result.body result_body = json.loads(result_json_string) if isinstance(result_body, dict) and 'status' in result_body: result_status = result_body['status'] except: pass if not result_status == "success": raise ValueError(f"""failed to destroy vm "{id}" on host "{host.id}" for {email}: {result_json_string}""") def vm_state_command(self, email: str, id: str, command: str): validate_capsul_id(id) result_status = None host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="vm_state_command", email=email, id=id, command=command)) results = self.synchronous_operation([host], email, payload) result_json_string = "" for result in results: try: result_json_string = result.body result_body = json.loads(result_json_string) if isinstance(result_body, dict) and 'status' in result_body: result_status = result_body['status'] except: pass if not result_status == "success": raise ValueError(f"""failed to {command} vm "{id}" on host "{host.id}" for {email}: {result_json_string}""") def net_set_dhcp(self, email: str, host_id: str, network_name: str, macs: list, remove_ipv4: str, add_ipv4: str): result_status = None host = get_model().host_by_id(host_id) if host is not None: payload = json.dumps(dict(type="net_set_dhcp", email=email, network_name=network_name, macs=macs, remove_ipv4=remove_ipv4, add_ipv4=add_ipv4)) results = self.synchronous_operation([host], email, payload) result_json_string = "" for result in results: try: result_json_string = result.body result_body = json.loads(result_json_string) if isinstance(result_body, dict) and 'status' in result_body: result_status = result_body['status'] except: pass if not result_status == "success": raise ValueError(f"""failed to net_set_dhcp on host "{host.id}" for {email}: {result_json_string}""")