import subprocess import re import sys import requests import json import asyncio from typing import List, Tuple import aiohttp from flask import current_app from time import sleep from os.path import join from subprocess import run from capsulflask.db_model import OnlineHost from capsulflask.spoke_model import VirtualMachine from capsulflask.spoke_model import validate_capsul_id from capsulflask.db import get_model, my_exec_info_message class HTTPResult: def __init__(self, status_code, body=None): self.status_code = status_code self.body = body class HubInterface: def capacity_avaliable(self, additional_ram_bytes: int) -> bool: pass def get(self, id: str) -> VirtualMachine: pass def list_ids(self) -> list: pass def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory: int, ssh_public_keys: list): pass def destroy(self, email: str, id: str): pass class MockHub(HubInterface): def capacity_avaliable(self, additional_ram_bytes): return True def get(self, id): validate_capsul_id(id) return VirtualMachine(id, current_app.config["SPOKE_HOST_ID"], ipv4="1.1.1.1") def list_ids(self) -> list: return get_model().all_non_deleted_vm_ids() def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list): validate_capsul_id(id) current_app.logger.info(f"mock create: {id} for {email}") sleep(1) def destroy(self, email: str, id: str): current_app.logger.info(f"mock destroy: {id} for {email}") class CapsulFlaskHub(HubInterface): async def post_json(self, method: str, url: str, body: str, session: aiohttp.ClientSession) -> HTTPResult: response = None try: response = await session.request( method=method, url=url, json=body, auth=aiohttp.BasicAuth("hub", current_app.config['HUB_TOKEN']), verify_ssl=True, ) except: error_message = my_exec_info_message(sys.exc_info()) response_body = json.dumps({"error_message": f"error contacting spoke: {error_message}"}) current_app.logger.error(f""" error contacting spoke: post_json (HTTP {method} {url}) failed with: {error_message}""" ) return HTTPResult(-1, response_body) response_body = None try: response_body = await response.text() except: error_message = my_exec_info_message(sys.exc_info()) response_body = json.dumps({"error_message": f"error reading response from spoke: {error_message}"}) current_app.logger.error(f""" error reading response from spoke: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" ) return HTTPResult(response.status, response_body) async def make_requests(self, online_hosts: List[OnlineHost], body: str) -> List(HTTPResult): timeout_seconds = 5 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_seconds)) as session: tasks = [] # append to tasks in the same order as online_hosts for host in online_hosts: tasks.append( self.post_json(method="POST", url=host.url, body=body, session=session) ) # gather is like Promise.all from javascript, it returns a future which resolves to an array of results # in the same order as the tasks that we passed in -- which were in the same order as online_hosts results = await asyncio.gather(*tasks) return results async def generic_operation(self, hosts: List[OnlineHost], payload: str, immediate_mode: bool) -> Tuple[int, List[HTTPResult]]: operation_id = get_model().create_operation(hosts, payload) results = await self.make_requests(hosts, payload) for i in range(len(hosts)): host = hosts[i] result = results[i] task_result = None assignment_status = "pending" if result.status_code == -1: assignment_status = "no_response_from_host" if result.status_code != 200: assignment_status = "error_response_from_host" else: valid_statuses = { "assigned": True, "not_applicable": True, "assigned_to_other_host": True, } result_is_json = False result_is_dict = False result_has_status = False result_has_valid_status = False assignment_status = "invalid_response_from_host" try: if immediate_mode: task_result = result.body result_body = json.loads(result.body) result_is_json = True result_is_dict = isinstance(result_body, dict) result_has_status = result_is_dict and 'assignment_status' in result_body result_has_valid_status = result_has_status and result_body['assignment_status'] in valid_statuses if result_has_valid_status: assignment_status = result_body['assignment_status'] except: pass if not result_has_valid_status: current_app.logger.error(f"""error reading assignment_status for operation {operation_id} from host {host.id}: result_is_json: {result_is_json} result_is_dict: {result_is_dict} result_has_status: {result_has_status} result_has_valid_status: {result_has_valid_status} """ ) get_model().update_host_operation(host.id, operation_id, assignment_status, task_result) return results async def capacity_avaliable(self, additional_ram_bytes): online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="capacity_avaliable", additional_ram_bytes=additional_ram_bytes)) op = await self.generic_operation(online_hosts, payload, True) results = op[1] for result in results: try: result_body = json.loads(result.body) if isinstance(result_body, dict) and 'capacity_avaliable' in result_body and result_body['capacity_avaliable'] == True: return True except: pass return False async def get(self, id) -> VirtualMachine: validate_capsul_id(id) host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="get", id=id)) op = await self.generic_operation([host], payload, True) results = op[1] for result in results: try: result_body = json.loads(result.body) if isinstance(result_body, dict) and ('ipv4' in result_body or 'ipv6' in result_body): return VirtualMachine(id, host=host, ipv4=result_body['ipv4'], ipv6=result_body['ipv6']) except: pass return None def list_ids(self) -> list: online_hosts = get_model().get_online_hosts() payload = json.dumps(dict(type="list_ids")) op = await self.generic_operation(online_hosts, payload, False) operation_id = op[0] results = op[1] to_return = [] for i in range(len(results)): host = online_hosts[i] result = results[i] try: result_body = json.loads(result.body) if isinstance(result_body, dict) and 'ids' in result_body and isinstance(result_body['ids'], list): all_valid = True for id in result_body['ids']: try: validate_capsul_id(id) to_return.append(id) except: all_valid = False if all_valid: get_model().update_host_operation(host.id, operation_id, None, result.body) else: result_json_string = json.dumps({"error_message": "invalid capsul id returned"}) get_model().update_host_operation(host.id, operation_id, None, result_json_string) current_app.logger.error(f"""error reading ids for list_ids operation {operation_id}, host {host.id}""") else: result_json_string = json.dumps({"error_message": "invalid response, missing 'ids' list"}) get_model().update_host_operation(host.id, operation_id, "invalid_response_from_host", result_json_string) current_app.logger.error(f"""missing 'ids' list for list_ids operation {operation_id}, host {host.id}""") except: # no need to do anything here since if it cant be parsed then generic_operation will handle it. pass return to_return def create(self, email: str, id: str, template_image_file_name: str, vcpus: int, memory_mb: int, ssh_public_keys: list): validate_capsul_id(id) online_hosts = get_model().get_online_hosts() payload = json.dumps(dict( type="create", email=email, id=id, template_image_file_name=template_image_file_name, vcpus=vcpus, memory_mb=memory_mb, ssh_public_keys=ssh_public_keys, )) op = await self.generic_operation(online_hosts, payload, False) operation_id = op[0] results = op[1] number_of_assigned = 0 assigned_hosts = [] for i in range(len(results)): host = online_hosts[i] result = results[i] try: result_body = json.loads(result.body) if isinstance(result_body, dict) and 'assignment_status' in result_body and result_body['assignment_status'] == "assigned": number_of_assigned += 1 assigned_hosts.append(host.id) except: # no need to do anything here since if it cant be parsed then generic_operation will handle it. pass if number_of_assigned != 1: assigned_hosts_string = ", ".join(assigned_hosts) raise ValueError(f"expected create capsul operation {operation_id} to be assigned to one host, it was assigned to {number_of_assigned} ({assigned_hosts_string})") def destroy(self, email: str, id: str): validate_capsul_id(id) result_status = None host = get_model().host_of_capsul(id) if host is not None: payload = json.dumps(dict(type="destroy", id=id)) op = await self.generic_operation([host], payload, True) results = op[1] result_json_string = "" for result in results: try: result_json_string = result.body result_body = json.loads(result_json_string) if isinstance(result_body, dict) and 'status' in result_body: result_status = result_body['status'] except: pass if not result_status == "success": raise ValueError(f"""failed to destroy vm "{id}" on host "{host}" for {email}: {result_json_string}""")